1 - wallaroo.assay

class Assay(wallaroo.object.Object):

An Assay represents a record in the database. An assay contains some high level attributes such as name, status, active, etc. as well as the sub objects Baseline, Window and Summarizer which specify how the Baseline is derived, how the Windows should be created and how the analysis should be conducted.

Assay(client: Optional[wallaroo.client.Client], data: Dict[str, Any])

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def turn_on(self):

Sets the Assay to active causing it to run and backfill any missing analysis.

def turn_off(self):

Disables the Assay. No further analysis will be conducted until the assay is enabled.

def set_alert_threshold(self, threshold: float):

Sets the alert threshold at the specified level. The status in the AssayAnalysis will show if this level is exceeded however currently alerting/notifications are not implemented.

def set_warning_threshold(self, threshold: float):

Sets the warning threshold at the specified level. The status in the AssayAnalysis will show if this level is exceeded however currently alerting/notifications are not implemented.

def meta_df(assay_result: Dict, index_name) -> pandas.core.frame.DataFrame:

Creates a dataframe for the meta data in the baseline or window excluding the edge information.

Parameters
  • assay_result: The dict of the raw asset result
def edge_df(window_or_baseline: Dict) -> pandas.core.frame.DataFrame:

Creates a dataframe specifically for the edge information in the baseline or window.

Parameters
  • window_or_baseline: The dict from the assay result of either the window or baseline
class AssayAnalysis:

The AssayAnalysis class helps handle the assay analysis logs from the Plateau logs. These logs are a json document with meta information on the assay and analysis as well as summary information on the baseline and window and information on the comparison between them.

AssayAnalysis(raw: Dict[str, Any])
def compare_basic_stats(self) -> pandas.core.frame.DataFrame:

Creates a simple dataframe making it easy to compare a baseline and window.

def baseline_stats(self) -> pandas.core.frame.DataFrame:

Creates a simple dataframe with the basic stats data for a baseline.

def compare_bins(self) -> pandas.core.frame.DataFrame:

Creates a simple dataframe to compare the bin/edge information of baseline and window.

def baseline_bins(self) -> pandas.core.frame.DataFrame:

Creates a simple dataframe to with the edge/bin data for a baseline.

def chart(self, show_scores=True):

Quickly create a chart showing the bins, values and scores of an assay analysis. show_scores will also label each bin with its final weighted (if specified) score.

class AssayAnalysisList:

Helper class primarily to easily create a dataframe from a list of AssayAnalysis objects.

AssayAnalysisList(raw: List[wallaroo.assay.AssayAnalysis])
def to_dataframe(self) -> pandas.core.frame.DataFrame:

Creates and returns a summary dataframe from the assay results.

def to_full_dataframe(self) -> pandas.core.frame.DataFrame:

Creates and returns a dataframe with all values including inputs and outputs from the assay results.

def chart_df( self, df: Union[pandas.core.frame.DataFrame, pandas.core.series.Series], title: str, nth_x_tick=None):

Creates a basic chart of the scores from dataframe created from assay analysis list

def chart_scores(self, title: Optional[str] = None, nth_x_tick=4):

Creates a basic chart of the scores from an AssayAnalysisList

def chart_iopaths( self, labels: Optional[List[str]] = None, selected_labels: Optional[List[str]] = None, nth_x_tick=None):

Creates a basic charts of the scores for each unique iopath of an AssayAnalysisList

class Assays(typing.List[wallaroo.assay.Assay]):

Wraps a list of assays for display in an HTML display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

2 - wallaroo.assay_config

def unwrap(v: Optional[~T]) -> ~T:

Simple function to placate pylance

class BaselineConfig:

Abstract base class for Baseline config objects. Currently only FixedBaseline is implemented though SlidingBaseline and others are planned.

BaselineConfig()
def to_json(self) -> str:
class FixedBaseline(BaselineConfig):

The FixedBaseline is calculate from the inferences from a specific time window.

FixedBaseline( pipeline_name: str, model_name: str, start: datetime.datetime, end: datetime.datetime)
Inherited Members
BaselineConfig
to_json
class BaselineBuilder(abc.ABC):

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def build(self) -> wallaroo.assay_config.BaselineConfig:
def to_json(self) -> str:
def ensure_tz(d: datetime.datetime) -> datetime.datetime:

Ensure the date it tz aware. If naive assume it is in utc.

class FixedBaselineBuilder(BaselineBuilder):

Helps to easily create the config object for a FixedBaseline.

FixedBaselineBuilder(pipeline_name: str)
def add_model_name(self, model_name: str):

Specify the model to use in the baseline

def add_start(self, start: datetime.datetime):

Specify the start of the window for the baseline

def add_end(self, end: datetime.datetime):

Specify the end of the window for the baseline

def build(self) -> wallaroo.assay_config.FixedBaseline:

Create the FixedBaseline object.

Inherited Members
BaselineBuilder
to_json
class SummarizerConfig:

The summarizer specifies how the bins of the baseline and window should be compared.

SummarizerConfig()
def to_json(self) -> str:
class BinMode(builtins.str, enum.Enum):

How should we calculate the bins. NONE - no bins. Only useful if we only care about the mean, median, etc. EQUAL - evenly spaced bins: min - max / num_bins QUANTILE - based on percentages. If num_bins is 5 then quantiles so bins are created at the 20%, 40%, 60%, 80% and 100% points. PROVIDED - user provides the edge points for the bins.

NONE = <BinMode.NONE: 'None'>
EQUAL = <BinMode.EQUAL: 'Equal'>
QUANTILE = <BinMode.QUANTILE: 'Quantile'>
PROVIDED = <BinMode.PROVIDED: 'Provided'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Aggregation(builtins.str, enum.Enum):

What we use to calculate the score. EDGES - distnces between the edges. DENSITY - percentage of values that fall in each bin. CUMULATIVE - cumulative percentage that fall in the bins.

EDGES = <Aggregation.EDGES: 'Edges'>
DENSITY = <Aggregation.DENSITY: 'Density'>
CUMULATIVE = <Aggregation.CUMULATIVE: 'Cumulative'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Metric(builtins.str, enum.Enum):

How we calculate the score. MAXDIFF - maximum difference between corresponding bins. SUMDIFF - sum of differences between corresponding bins. PSI - Population Stability Index

MAXDIFF = <Metric.MAXDIFF: 'MaxDiff'>
SUMDIFF = <Metric.SUMDIFF: 'SumDiff'>
PSI = <Metric.PSI: 'PSI'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class UnivariateContinousSummarizerConfig(SummarizerConfig):

The UnivariateContinousSummarizer analyizes one input or output feature (Univariate) at a time. Expects the values to be continous or at least numerous enough to fall in various/all the bins.

UnivariateContinousSummarizerConfig( bin_mode: wallaroo.assay_config.BinMode, aggregation: wallaroo.assay_config.Aggregation, metric: wallaroo.assay_config.Metric, num_bins: int, bin_weights: Optional[List[float]] = None, bin_width: Optional[float] = None, provided_edges: Optional[List[float]] = None, add_outlier_edges: bool = True)
Inherited Members
SummarizerConfig
to_json
class SummarizerBuilder(abc.ABC):

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def build(self) -> wallaroo.assay_config.SummarizerConfig:
class UnivariateContinousSummarizerBuilder(SummarizerBuilder):

Builds the UnviariateSummarizer

UnivariateContinousSummarizerBuilder()
def build(self) -> wallaroo.assay_config.UnivariateContinousSummarizerConfig:
def add_bin_mode( self, bin_mode: wallaroo.assay_config.BinMode, edges: Optional[List[float]] = None):

Sets the binning mode. If BinMode.PROVIDED is specified a list of edges is also required.

def add_num_bins(self, num_bins: int):

Sets the number of bins. If weights have been previously set they must be set to none to allow changing the number of bins.

def add_bin_weights(self, weights: Optional[List[float]]):

Specifies the weighting to be given to the bins. The number of weights must be 2 larger than the number of bins to accomodate outliers smaller and outliers larger than values seen in the baseline. The passed in values can be whole or real numbers and do not need to add up to 1 or any other specific value as they will be normalized during the score calculation phase. The weights passed in can be none to remove previously specified weights and to allow changing of the number of bins.

def add_metric(self, metric: wallaroo.assay_config.Metric):

Sets the metric mode.

def add_aggregation(self, aggregation: wallaroo.assay_config.Aggregation):

Sets the aggregation style.

def add_bin_edges(self, edges: Optional[List[float]]):

Specifies the right hand side (max value) of the bins. The number of edges must be equal to or one more than the number of bins. When equal to the number of bins the edge for the left outlier bin is calculated from the baseline. When an additional edge (one more than number of bins) that first (lower) value is used as the max value for the left outlier bin. The max value for the right hand outlier bin is always Float MAX.

class WindowConfig:

Configures a window to be compared against the baseline.

WindowConfig( pipeline_name: str, model_name: str, width: str, start: Optional[datetime.datetime] = None, interval: Optional[str] = None)
def to_json(self) -> str:
class WindowBuilder:

Helps build a WindowConfig. model and width are required but there are no good default values for them because they depend on the baseline. We leave it up to the assay builder to configure the window correctly after it is created.

WindowBuilder(pipeline_name: str)
def add_model_name(self, model_name: str):

The model name (model_id) that the window should analyze.

def add_width(self, **kwargs: int):

The width of the window to use when collecting data for analysis.

def add_interval(self, **kwargs: int):

The width of the window to use when collecting data for analysis.

def add_start(self, start: datetime.datetime):
def build(self) -> wallaroo.assay_config.WindowConfig:
def ConfigEncoder(o):

Used to format datetimes as we need when encoding to JSON

class AssayConfig:

Configuration for an Assay record.

AssayConfig( client: Optional[wallaroo.client.Client], name: str, pipeline_id: int, pipeline_name: str, active: bool, status: str, iopath: str, baseline: wallaroo.assay_config.BaselineConfig, window: wallaroo.assay_config.WindowConfig, summarizer: wallaroo.assay_config.SummarizerConfig, warning_threshold: Optional[float], alert_threshold: float, run_until: Optional[datetime.datetime], workspace_id: Optional[int])
def to_json(self) -> str:
def interactive_run(self) -> wallaroo.assay.AssayAnalysisList:

Runs this assay interactively. The assay is not saved to the database nor are analyis records saved to a Plateau topic. Useful for exploring pipeline inference data and experimenting with thresholds.

def interactive_baseline_run(self) -> Optional[wallaroo.assay.AssayAnalysis]:
def interactive_input_run( self, inferences: List[Dict], labels: Optional[List[str]]) -> wallaroo.assay.AssayAnalysisList:

Analyzes the inputs given to create an interactive run for each feature column. The assay is not saved to the database nor are analyis records saved to a Plateau topic. Usefull for exploring inputs for possible causes when a difference is detected in the output.

class AssayBuilder:

Helps build an AssayConfig

AssayBuilder( client: Optional[wallaroo.client.Client], name: str, pipeline_id: int, pipeline_name: str, model_name: str, baseline_start: datetime.datetime, baseline_end: datetime.datetime)
def baseline_dataframe(self):
def baseline_histogram( self, bins: Union[int, str, NoneType] = None, log_scale: bool = False):
def baseline_kde(self, log_scale: bool = False):
def baseline_ecdf(self, log_scale: bool = False):
def build(self) -> wallaroo.assay_config.AssayConfig:
def upload(self) -> int:
def add_name(self, name: str):

Specify the assay name

def add_active(self, active: bool):

Specify if the assay is active or not

def add_iopath(self, iopath: str):

Specify what the assay should analyze. Should start with input or output and have indexes (zero based) into row and column: For example 'input 0 1' specifies the second column of the first input.

def fixed_baseline_builder(self):

Specify creates a fixed baseline builder for this assay builder.

def add_baseline(self, baseline: wallaroo.assay_config.BaselineConfig):

Adds a specific baseline created elsewhere.

def window_builder(self):

Returns this assay builders window builder.

def add_window(self, window: wallaroo.assay_config.WindowConfig):

Adds a window created elsewhere.

def univariate_continuous_summarizer(self) -> wallaroo.assay_config.UnivariateContinousSummarizerBuilder:

Creates and adds an UCS to this assay builder.

def add_summarizer(self, summarizer: wallaroo.assay_config.SummarizerConfig):

Adds the summarizer created elsewhere to this builder.

def add_warning_threshold(self, warning_threshold: float):

Specify the warning threshold for this assay.

def add_alert_threshold(self, alert_threshold: float):

Specify the alert threshold for this assay.

def add_run_until(self, run_until: datetime.datetime):

"How long should this assay run. Primarily useful for interactive runs to limit the number of analysis.

def calc_bins(num_samples: int, bins: Union[int, str, NoneType]) -> Union[str, int]:

If the users specifies a number of bins or a strategy for calculating it use that. Else us the min of the square root or 50.

3 - wallaroo.auth

Handles authentication to the Wallaroo platform.

Performs a "device code"-style OAuth login flow.

The code is organized as follows:

  • Auth objects returned by create() should be placed on each request to platform APIs. Currently, we have the following types:

    • NoAuth: Does not modify requests
    • PlatformAuth: Places Authorization: Bearer XXX headers on each outgoing request
  • Objects derived from TokenFetcher know how to obtain an AccessToken from a particular provider:

    • KeycloakTokenFetcher: Fetches a token from Keycloak using a device code login flow
    • CachedTokenFetcher: Wraps another TokenFetcher and caches the value to a JSON file to reduce the number of user logins needed.
class AuthType(enum.Enum):

Defines all the supported auth types.

Handles conversions from string names to enum values.

NONE = <AuthType.NONE: 'none'>
SSO = <AuthType.SSO: 'sso'>
USER_PASSWORD = <AuthType.USER_PASSWORD: 'user_password'>
TEST_AUTH = <AuthType.TEST_AUTH: 'test_auth'>
TOKEN = <AuthType.TOKEN: 'token'>
Inherited Members
enum.Enum
name
value
class TokenData(typing.NamedTuple):

TokenData(token, user_email, user_id)

TokenData(token: str, user_email: str, user_id: str)

Create new instance of TokenData(token, user_email, user_id)

token: str

Alias for field number 0

user_email: str

Alias for field number 1

user_id: str

Alias for field number 2

def to_dict(self) -> Dict[str, str]:
Inherited Members
builtins.tuple
index
count
def create( keycloak_addr: str, auth_type: Union[wallaroo.auth.AuthType, str, NoneType]) -> wallaroo.auth._WallarooAuth:

Returns an auth object of the corresponding type.

Parameters
  • str keycloak_addr: Address of the Keycloak instance to auth against
  • AuthType or str auth_type: Type of authentication to use
Returns

Auth object that can be passed to all requests calls

Raises
  • NotImplementedError: if auth_type is not recognized
def logout():

Removes cached values for all third-party auth providers.

This will not invalidate auth objects already created with create().

class AuthError(builtins.Exception):

Base type for all errors in this module.

AuthError(message: str, code: Optional[int] = None)
Inherited Members
builtins.BaseException
with_traceback
args
class TokenFetchError(AuthError):

Errors encountered while performing a login.

Inherited Members
AuthError
AuthError
builtins.BaseException
with_traceback
args
class TokenRefreshError(AuthError):

Errors encountered while refreshing an AccessToken.

Inherited Members
AuthError
AuthError
builtins.BaseException
with_traceback
args

4 - wallaroo.checks

class Expression:

Root base class for all model-checker expressions. Provides pythonic magic-method sugar for expression definitions.

Expression()
def model_names(self):
def as_json(self):
def one_of(self, *values):
@classmethod
def from_py(cls, value):

Creates an :py:Expression: from a given python value.

def top_json(self) -> Dict[str, object]:

Creates a top-level expression that can be passed to the model checker runtime.

class Function(Expression):

Root base class for all model-checker expressions. Provides pythonic magic-method sugar for expression definitions.

Function(op, args)
def model_names(self):
def as_json(self):
Inherited Members
Expression
one_of
from_py
top_json
class BinOp(Expression):

Root base class for all model-checker expressions. Provides pythonic magic-method sugar for expression definitions.

BinOp(op, left, right)
def model_names(self):
def as_json(self):
Inherited Members
Expression
one_of
from_py
top_json
class Variable(Expression):

Declares a model variable that can be used as an :py:Expression: in the model checker. Variables are identified by their model_name, a position of either "input" or "output", and the tensor index.

Variable(model_name, position, index)
def model_names(self):
def as_json(self):
Inherited Members
Expression
one_of
from_py
top_json
def value_to_node(value):
class Value(Expression):

Root base class for all model-checker expressions. Provides pythonic magic-method sugar for expression definitions.

Value(value)
def model_names(self):
def as_json(self):
Inherited Members
Expression
one_of
from_py
top_json
def is_prom_primitive(v):
class Aggregate:
Aggregate( name: str, promql_agg: str, inner_expression: wallaroo.checks.Expression, duration: datetime.timedelta, bucket_size: Optional[datetime.timedelta])
def expression(self):
def promql(self, gauge_name):
class Alert:
Alert(op, left, right)
def promql(self, gauge_name):
class DefinedFunction:
DefinedFunction(name)
class DefinedAggregate:
DefinedAggregate(name: str, promql_agg)
class Variables:
Variables(model, position)
def instrument( values: Dict[str, wallaroo.checks.Expression], gauges: List[str], validations: List[str]):
def dns_compliant(name: str):

Returns true if a string is compliant with DNS label name requirement to ensure it can be a part of a full DNS host name

def require_dns_compliance(name: str):

Validates that 'name' complies with DNS naming requirements or raises an exception

5 - wallaroo.client

class Client:

Client handle to a Wallaroo platform instance.

Objects of this class serve as the entrypoint to Wallaroo platform functionality.

Client( api_endpoint: str = 'http://api-lb:8080', auth_endpoint: str = '', request_timeout: int = 45, auth_type: Optional[str] = None, gql_client: Optional[gql.client.Client] = None, pg_connection_string: str = 'dbname=postgres user=postgres password=password host=postgres port=5432', interactive: Optional[bool] = None, time_format: str = '%Y-%d-%b %H:%M:%S')

Create a Client handle.

Parameters
  • str api_endpoint: Host/port of the platform API endpoint
  • str auth_endpoint: Host/port of the platform Keycloak instance
  • int timeout: Max timeout of web requests, in seconds
  • str auth_type: Authentication type to use. Can be one of: "none", "sso", "user_password".
  • str pg_connection_string: Postgres connection string
  • bool interactive: If provided and True, some calls will print additional human information, or won't when False. If not provided, interactive defaults to True if running inside Jupyter and False otherwise.
  • str time_format: Preferred strftime format string for displaying timestamps in a human context.
@staticmethod
def get_urls( auth_type: Optional[str], api_endpoint: str, auth_endpoint: str) -> Tuple[Optional[str], str, str]:

Method to calculate the auth values specified as defaults, as params or in ENV vars. Made static to be testable without reaching out to SSO, etc.

def list_tags(self) -> wallaroo.tag.Tags:

List all tags on the platform.

Returns

A list of all tags on the platform.

def list_models(self) -> wallaroo.models.ModelsList:

List all models on the platform.

Returns

A list of all models on the platform.

def list_deployments(self) -> List[wallaroo.deployment.Deployment]:

List all deployments (active or not) on the platform.

Returns

A list of all deployments on the platform.

def search_pipelines( self, search_term: Optional[str] = None, deployed: Optional[bool] = None, created_start: Optional[Datetime] = None, created_end: Optional[Datetime] = None, updated_start: Optional[Datetime] = None, updated_end: Optional[Datetime] = None) -> wallaroo.pipeline_variant.PipelineVariants:

Search for pipelines. All parameters are optional, in which case the result is the same as list_pipelines(). All times are strings to be parsed by datetime.isoformat. Example:

 myclient.search_pipelines(created_end='2022-04-19 13:17:59+00:00', search_term="foo")
Parameters
  • str search_term: Will be matched against tags and model names. Example: "footag123".
  • bool deployed: Pipeline was deployed or not
  • str created_start: Pipeline was created at or after this time
  • str created_end: Pipeline was created at or before this time
  • str updated_start: Pipeline was updated at or before this time
  • str updated_end: Pipeline was updated at or before this time
Returns

A list of all pipelines on the platform.

def search_my_models( self, search_term: Optional[str] = None, uploaded_time_start: Optional[Datetime] = None, uploaded_time_end: Optional[Datetime] = None) -> wallaroo.model.ModelVersions:

Search models owned by you params: search_term: Searches the following metadata: names, shas, versions, file names, and tags uploaded_time_start: Inclusive time of upload uploaded_time_end: Inclusive time of upload

def search_models( self, search_term: Optional[str] = None, uploaded_time_start: Optional[Datetime] = None, uploaded_time_end: Optional[Datetime] = None) -> wallaroo.model.ModelVersions:

Search all models you have access to. params: search_term: Searches the following metadata: names, shas, versions, file names, and tags uploaded_time_start: Inclusive time of upload uploaded_time_end: Inclusive time of upload

def get_user_by_email(self, email: str) -> Optional[wallaroo.user.User]:

Find a user by email

def deactivate_user(self, email: str) -> None:

Deactivates an existing user of the platform

Deactivated users cannot log into the platform. Deactivated users do not count towards the number of allotted user seats from the license.

The Models and Pipelines owned by the deactivated user are not removed from the platform.

Parameters
  • str email: The email address of the user to deactivate.
Returns

None

def activate_user(self, email: str) -> None:

Activates an existing user of the platform that had been previously deactivated.

Activated users can log into the platform.

Parameters
  • str email: The email address of the user to activate.
Returns

None

def list_users(self) -> List[wallaroo.user.User]:

List of all Users on the platform

Returns

A list of all Users on the platform.

def upload_model(self, name: str, path: Union[str, pathlib.Path]) -> wallaroo.model.Model:

Upload a model defined by a file as a new model variant.

Parameters
  • str model_name: The name of the model of which this is a variant. Names must be ASCII alpha-numeric characters or dash (-) only.
  • Union[str, pathlib.Path] path: Path of the model file to upload.
Returns

The created Model.

def register_model_image(self, name: str, image: str) -> wallaroo.model.Model:

Registers an MLFlow model as a new model.

Parameters
  • str model_name: The name of the model of which this is a variant. Names must be ASCII alpha-numeric characters or dash (-) only.
  • str image: Image name of the MLFlow model to register.
Returns

The created Model.

def model_by_name(self, model_class: str, model_name: str) -> wallaroo.model.Model:

Fetch a Model by name.

Parameters
  • str model_class: Name of the model class.
  • str model_name: Name of the variant within the specified model class.
Returns

The Model with the corresponding model and variant name.

def deployment_by_name(self, deployment_name: str) -> wallaroo.deployment.Deployment:

Fetch a Deployment by name.

Parameters
  • str deployment_name: Name of the deployment.
Returns

The Deployment with the corresponding name.

def pipelines_by_name(self, pipeline_name: str) -> List[wallaroo.pipeline.Pipeline]:

Fetch Pipelines by name.

Parameters
  • str pipeline_name: Name of the pipeline.
Returns

The Pipeline with the corresponding name.

def list_pipelines(self) -> List[wallaroo.pipeline.Pipeline]:

List all pipelines on the platform.

Returns

A list of all pipelines on the platform.

def build_pipeline(self, pipeline_name: str) -> wallaroo.pipeline.Pipeline:

Starts building a pipeline with the given pipeline_name, returning a :py:PipelineConfigBuilder:

When completed, the pipeline can be uploaded with .upload()

Parameters
  • pipeline_name string: Name of the pipeline, must be composed of ASCII alpha-numeric characters plus dash (-).
def create_value_split_experiment( self, name: str, meta_key: str, default_model: wallaroo.model_config.ModelConfig, challenger_models: List[Tuple[Any, wallaroo.model_config.ModelConfig]]) -> wallaroo.pipeline.Pipeline:

Creates a new PipelineVariant of a "value-split experiment" type.

Parameters
  • str name: Name of the Pipeline
  • meta_key str: Inference input key on which to redirect inputs to experiment models.
  • default_model ModelConfig: Model to send inferences by default.
  • challenger_models List[Tuple[Any, ModelConfig]]: A list of meta_key values -> Models to send inferences. If the inference data referred to by meta_key is equal to one of the keys in this tuple, that inference is redirected to the corresponding model instead of the default model.
def get_logs( self, topic: str, limit: int = 100, dataset: Union[Sequence[str], str, NoneType] = None, dataset_exclude: Union[Sequence[str], str, NoneType] = None, dataset_separator: Optional[str] = None, arrow: Optional[bool] = False) -> Tuple[Union[wallaroo.logs.LogEntries, pyarrow.lib.Table, pandas.core.frame.DataFrame], str]:
def security_logs(self, limit: int) -> List[dict]:

This function is not available in this release

def get_raw_logs( self, topic: str, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, limit: int = 100000, parse: bool = False, verbose: bool = False) -> List[Dict[str, Any]]:

Gets logs from Plateau for a particular time window without attempting to convert them to Inference LogEntries. Logs can be returned as strings or the json parsed into lists and dicts.

Parameters
  • topic str: The name of the topic to query
  • start Optional[datetime]: The start of the time window
  • end Optional[datetime]: The end of the time window
  • limit int: The number of records to retrieve. Note retrieving many records may be a performance bottleneck.
  • parse bool: Wether to attempt to parse the string as a json object.
  • verbose bool: Prints out info to help diagnose issues.
def get_raw_pipeline_inference_logs( self, topic: str, start: datetime.datetime, end: datetime.datetime, model_name: Optional[str] = None, limit: int = 100000, verbose: bool = False) -> List[Dict[str, Any]]:

Gets logs from Plateau for a particular time window and filters them for the model specified.

Parameters
  • pipeline_name str: The name/pipeline_id of the pipeline to query
  • topic str: The name of the topic to query
  • start Optional[datetime]: The start of the time window
  • end Optional[datetime]: The end of the time window
  • model_id: The name of the specific model to filter if any
  • limit int: The number of records to retrieve. Note retrieving many records may be a performance bottleneck.
  • verbose bool: Prints out info to help diagnose issues.
def get_pipeline_inference_dataframe( self, topic: str, start: datetime.datetime, end: datetime.datetime, model_name: Optional[str] = None, limit: int = 100000, verbose=False) -> pandas.core.frame.DataFrame:
def get_assay_results( self, assay_id: int, start: datetime.datetime, end: datetime.datetime) -> wallaroo.assay.AssayAnalysisList:

Gets the assay results for a particular time window, parses them, and returns an AssayAnalysisList of AssayAnalysis.

Parameters
  • assay_id int: The id of the assay we are looking for.
  • start datetime: The start of the time window
  • end datetime: The end of the time window
def build_assay( self, assay_name: str, pipeline: wallaroo.pipeline.Pipeline, model_name: str, baseline_start: datetime.datetime, baseline_end: datetime.datetime) -> wallaroo.assay_config.AssayBuilder:

Creates an AssayBuilder that can be used to configure and create Assays.

Parameters
  • assay_name str: Human friendly name for the assay
  • pipeline Pipeline: The pipeline this assay will work on
  • model_name str: The model that this assay will monitor
  • baseline_start datetime: The start time for the inferences to use as the baseline
  • baseline_end datetime: The end time of the baseline window. the baseline. Windows start immediately after the baseline window and are run at regular intervals continously until the assay is deactivated or deleted.
def upload_assay(self, config: wallaroo.assay_config.AssayConfig) -> int:

Creates an assay in the database.

Parameters
  • config AssayConfig: The configuration for the assay to create.
Returns

The identifier for the assay that was created. :rtype int

def list_assays(self) -> List[wallaroo.assay.Assay]:

List all assays on the platform.

Returns

A list of all assays on the platform.

def create_tag(self, tag_text: str) -> wallaroo.tag.Tag:

Create a new tag with the given text.

def create_workspace(self, workspace_name: str) -> wallaroo.workspace.Workspace:

Create a new workspace with the current user as its first owner.

Parameters
  • str workspace_name: Name of the workspace, must be composed of ASCII alpha-numeric characters plus dash (-)
def list_workspaces(self) -> List[wallaroo.workspace.Workspace]:

List all workspaces on the platform which this user has permission see.

Returns

A list of all workspaces on the platform.

def set_current_workspace( self, workspace: wallaroo.workspace.Workspace) -> wallaroo.workspace.Workspace:

Any calls involving pipelines or models will use the given workspace from then on.

def get_current_workspace(self) -> wallaroo.workspace.Workspace:

Return the current workspace. See also set_current_workspace.

def invite_user(self, email, password=None):
def get_topic_name(self, pipeline_pk_id: int) -> str:
def shim_token(self, token_data: wallaroo.auth.TokenData):
def convert_model( self, path: Union[str, pathlib.Path], source_type: wallaroo.ModelConversion.ModelConversionSource, conversion_arguments: Union[wallaroo.ModelConversion.ConvertKerasArguments, wallaroo.ModelConversion.ConvertSKLearnArguments, wallaroo.ModelConversion.ConvertXGBoostArgs]) -> wallaroo.model.Model:

Given an inbound source model, a model type (xgboost, keras, sklearn), and conversion arguments. Convert the model to onnx, and add to available models for a pipeline.

Parameters
  • Union[str, pathlib.Path] path: The path to the model to convert, i.e. the source model.
  • ModelConversionSource source: The origin model type i.e. keras, sklearn or xgboost.
  • ModelConversionArguments conversion_arguments: A structure representing the arguments for converting a specific model type.
Returns

An instance of the Model being converted to Onnx.

Raises
  • ModelConversionGenericException: On a generic failure, please contact our support for further assistance.
  • ModelConversionFailure: Failure in converting the model type.
  • ModelConversionUnsupportedType: Raised when the source type passed is not supported.
  • ModelConversionSourceFileNotPresent: Raised when the passed source file does not exist.
def mlops(self) -> wallaroo.wallaroo_ml_ops_api_client.client.AuthenticatedClient:

6 - wallaroo.comment

class Comment(wallaroo.object.Object):

Comment that may be attached to models and pipelines.

Comment( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def user_id(*args, **kwargs):
def message(*args, **kwargs):
def updated_at(*args, **kwargs):
def list_models(self) -> List[wallaroo.model.Model]:

Lists the models this comment is on.

def list_pipelines(self) -> List[wallaroo.pipeline.Pipeline]:

Lists the models this comment is on.

def add_to_model(self, model_pk_id: int):
def remove_from_model(self, model_id: int):
def add_to_pipeline(self, pipeline_id: int):
def remove_from_pipeline(self, pipeline_pk_id: int):

7 - wallaroo.deployment

class WaitForError(builtins.Exception):

Common base class for all non-exit exceptions.

WaitForError(message: str, status: Optional[Dict[str, Any]])
Inherited Members
builtins.BaseException
with_traceback
args
class WaitForDeployError(builtins.RuntimeError):

Unspecified run-time error.

WaitForDeployError(message: str)
Inherited Members
builtins.BaseException
with_traceback
args
class Deployment(wallaroo.object.Object):

Base class for all backend GraphQL API objects.

This class serves as a framework for API objects to be constructed based on a partially-complete JSON response, and to fill in their remaining members dynamically if needed.

Deployment(client: Optional[wallaroo.client.Client], data: Dict[str, Any])

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def name(*args, **kwargs):
def deployed(*args, **kwargs):
def model_configs(*args, **kwargs):
def pipeline_variants(*args, **kwargs):
def deploy(self) -> wallaroo.deployment.Deployment:

Deploys this deployment, if it is not already deployed.

If the deployment is already deployed, this is a no-op.

def undeploy(self) -> wallaroo.deployment.Deployment:

Shuts down this deployment, if it is deployed.

If the deployment is already undeployed, this is a no-op.

def status(self) -> Dict[str, Any]:

Returns a dict of deployment status useful for determining if a deployment has succeeded.

Returns

Dict of deployment internal state information.

def check_limit_status(self):
def wait_for_running(self, timeout: Optional[int] = None) -> wallaroo.deployment.Deployment:

Waits for the deployment status to enter the "Running" state.

Will wait up "timeout_request" seconds for the deployment to enter that state. This is set in the "Client" object constructor. Will raise various exceptions on failures.

Returns

The deployment, for chaining.

def wait_for_undeployed(self) -> wallaroo.deployment.Deployment:

Waits for the deployment to end.

Will wait up "timeout_request" seconds for the deployment to enter that state. This is set in the "Client" object constructor. Will raise various exceptions on failures.

Returns

The deployment, for chaining.

def infer( self, tensor: Union[Dict[str, Any], pandas.core.frame.DataFrame, pyarrow.lib.Table], timeout: Union[int, float, NoneType] = None, dataset: Union[Sequence[str], str, NoneType] = None, dataset_exclude: Union[Sequence[str], str, NoneType] = None, dataset_separator: Optional[str] = None) -> Union[List[wallaroo.inference_result.InferenceResult], pandas.core.frame.DataFrame, pyarrow.lib.Table]:

Returns an inference result on this deployment, given a tensor.

Parameters
  • tensor: Union[Dict[str, Any], pd.DataFrame, pa.Table] Inference data. Should be a dictionary. Future improvement: will be a pandas dataframe or arrow table
  • timeout: Optional[Union[int, float]] infer requests will time out after the amount of seconds provided are exceeded. timeout defaults to 15 secs.
  • dataset: Optional[Union[Sequence[str], str]] By default this is set to return, ["time", "out"]. Other available options "check_failures", "metadata"
  • dataset_exclude: Optional[Union[Sequence[str], str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[str] If set to ".", return dataset will be flattened.
Returns

InferenceResult in dictionary, dataframe or arrow format.

def infer_from_file( self, filename: Union[str, pathlib.Path], timeout: Union[int, float, NoneType] = None, dataset: Optional[Sequence[str]] = None, exclude: Optional[Sequence[str]] = None, dataset_separator: Optional[str] = None) -> Union[List[wallaroo.inference_result.InferenceResult], pandas.core.frame.DataFrame, pyarrow.lib.Table]:
async def batch_infer_from_file( self, filename: Union[str, pathlib.Path], data_key: str = 'tensor', batch_size: int = 1000, connector_limit: int = 4) -> List[wallaroo.inference_result.InferenceResult]:

Async method to run batched inference on a data file for a given deployment.

Parameters
  • str filename: path to an existing file with tensor data in JSON format.
  • str data_key: key which the tensor data is under within the JSON. defaults to "tensor".
  • int batch_size: batch size to use when sending requests to the engine. defaults to 1000.
  • int connector_limit: limit for the amount of TCP connections. defaults to 4.
Returns

List of InferenceResult's.

def replace_model(self, model: wallaroo.model.Model) -> wallaroo.deployment.Deployment:

Replaces the current model with a default-configured Model.

Parameters
  • Model model: Model variant to replace current model with
def replace_configured_model( self, model_config: wallaroo.model_config.ModelConfig) -> wallaroo.deployment.Deployment:

Replaces the current model with a configured variant.

Parameters
  • ModelConfig model_config: Configured model to replace current model with
def internal_url(self) -> str:

Returns the internal inference URL that is only reachable from inside of the Wallaroo cluster by SDK instances deployed in the cluster.

If both pipelines and models are configured on the Deployment, this gives preference to pipelines. The returned URL is always for the first configured pipeline or model.

def url(self) -> str:

Returns the inference URL.

If both pipelines and models are configured on the Deployment, this gives preference to pipelines. The returned URL is always for the first configured pipeline or model.

def logs( self, limit: int = 100, valid: Optional[bool] = None) -> wallaroo.logs.LogEntries:

Deployment.logs() has been removed. Please use pipeline.logs() instead.

8 - wallaroo.deployment_config

class DeploymentConfig(typing.Dict):
def guarantee_workspace_id( self, workspace_id: Optional[int]) -> wallaroo.deployment_config.DeploymentConfig:
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class DeploymentConfigBuilder:
DeploymentConfigBuilder(workspace_id: Optional[int] = None)
def image(self, image: str) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def replica_count(self, count: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def replica_autoscale_min_max(self, maximum: int, minimum: int = 0):

Configures the minimum and maximum for autoscaling

def autoscale_cpu_utilization(self, cpu_utilization_percentage: int):

Sets the average CPU metric to scale on in a percentage

def disable_autoscale(self):

Disables autoscaling in the deployment configuration

def cpus( self, core_count: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def memory( self, memory_spec: str) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def lb_cpus( self, core_count: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def lb_memory( self, memory_spec: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def python_load_timeout_secs( self, timeout_secs: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def sidekick_cpus( self, model: wallaroo.model.Model, core_count: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:

Sets the number of CPUs to be used for the model's sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment.

Parameters
  • Model model: The sidekick model to configure.
  • int core_count: Number of CPU cores to use in this sidekick.
Returns

This DeploymentConfigBuilder instance for chaining.

def sidekick_memory( self, model: wallaroo.model.Model, memory_spec: str) -> wallaroo.deployment_config.DeploymentConfigBuilder:

Sets the memory to be used for the model's sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment.

Parameters
  • Model model: The sidekick model to configure.
  • str memory_spec: Specification of amount of memory (e.g., "2Gi", "500Mi") to use in this sidekick.
Returns

This DeploymentConfigBuilder instance for chaining.

def sidekick_env( self, model: wallaroo.model.Model, environment: Dict[str, str]) -> wallaroo.deployment_config.DeploymentConfigBuilder:

Sets the environment variables to be set for the model's sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment.

Parameters
  • Model model: The sidekick model to configure.
  • Dict[str, str] environment: Dictionary of environment variables names and their corresponding values to be set in the sidekick container.
Returns

This DeploymentConfigBuilder instance for chaining.

def build(self) -> wallaroo.deployment_config.DeploymentConfig:

9 - wallaroo.engine_config

class EngineConfig:

Wraps an engine config.

EngineConfig( cpus: int, inference_channel_size: Optional[int] = None, model_concurrency: Optional[int] = None, pipeline_config_directory: Optional[str] = None, model_config_directory: Optional[str] = None, model_directory: Optional[str] = None, audit_logging: bool = False, standalone: bool = False)
@staticmethod
def as_standalone( cpus: int, inference_channel_size: Optional[int] = None, model_concurrency: Optional[int] = None, pipeline_config_directory: Optional[str] = None, model_config_directory: Optional[str] = None, model_directory: Optional[str] = None) -> wallaroo.engine_config.EngineConfig:

Creates an EngineConfig for use with standalone mode

def to_json(self) -> str:

Returns a json representation of this object

def to_yaml(self) -> str:

Returns a yaml representation of this object for use with standalone mode

10 - wallaroo.inference_decode

def convert_to_np_dtype(dtype):
def to_nd_array_list(outputs: List[Dict[str, Any]]) -> List[numpy.ndarray]:
def decode_inference_result(entry: Dict[str, Any]) -> List[Dict[str, Any]]:

Decode inference results. Since they have a potentially rich structure, this could become a substantial effort in the future.

TODO: Support multiple outputs TODO: Support multiple data types

def flatten_tensor(prefix: str, numeric_list: list) -> Dict[str, numbers.Number]:

Converts a possibly multidimentionsl list of numbers into a dict where each item in the list is represented by a key value pair in the dict. Does not maintain dimensions since dataframes are 2d. Does not maintain/manage types since it should work for any type supported by numpy.

For example [1,2,3] => {prefix_0: 1, prefix_1: 2, prefix_2: 3}. [[1,2],[3,4]] => {prefix_0_0: 1, prefix_0_1: 2, prefix_1_0: 3, prefix_1_1: 4}

def flatten_dict(prefix: str, input_dict: Dict) -> Dict[str, Any]:

Recursively flattens the input dict, setting the values on the output dict. Assumes simple value types (str, numbers, dicts, and lists). If a value is a dict it is flattened recursively. If a value is a list each item is set as a new k, v pair.

def inference_logs_to_dataframe(logs: List[Dict[str, Any]]) -> pandas.core.frame.DataFrame:

Very similar to dict_list_to_dataframe but specific to inference logs since they have input and output heiararchical fields/structures that must be treated in particular ways.

def dict_list_to_dataframe(assay_results: List[Dict[str, Any]]) -> pandas.core.frame.DataFrame:

Primarily for assay result lists but can be used for any list of simple dicts.

11 - wallaroo.inference_result

class InferenceResult:
InferenceResult(gql_client: Optional[gql.client.Client], data: Dict[str, Any])

Initializes an InferenceResult.

Parameters
  • gql.Client gql_client: GQL client that this can pass to created objects.
  • Dict[str, Any] data: Response parsed from JSON inference result body.
def data(self) -> List[numpy.ndarray]:

Returns the inference result data.

def model(self) -> Tuple[str, str]:

Returns the model this inference was generated by.

def time_elapsed(self) -> datetime.timedelta:

Returns the length of time required for inference.

def timestamp(self) -> datetime.datetime:

Returns the time at which this inference occurred.

def input_data(self) -> Dict[str, Any]:

Returns the input data for this inference result.

def shadow_data(self) -> Optional[Dict[str, numpy.ndarray]]:

12 - wallaroo.logs

def fetch_plateau_logs(server: str, topic: str, limit: int = 100):
class LogEntry:

Wraps a single log entry.

This class is highly experimental, is unsupported/untested, and may change/disappear in the near future.

LogEntry(entry: Dict[str, Any])
class LogEntries(typing.List[wallaroo.logs.LogEntry]):

Wraps a list of log entries.

This class is highly experimental, is unsupported/untested, and may change/disappear in the near future.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort
class LogEntriesShadowDeploy(typing.List[wallaroo.logs.LogEntry]):

Wraps a list of log entries.

This class is highly experimental, is unsupported/untested, and may change/disappear in the near future.

LogEntriesShadowDeploy(logs: wallaroo.logs.LogEntries)
Inherited Members
builtins.list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

13 - wallaroo.model

class Model(wallaroo.object.Object):

Wraps a backend Model object.

Model( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
@staticmethod
def as_standalone(name: str, version: str, file_name: str) -> wallaroo.model.Model:

Creates a Model intended for use in generating standalone configurations

def id(self) -> int:
def uid(self) -> str:
def name(*args, **kwargs):
def version(*args, **kwargs):
def models_pk_id(*args, **kwargs):
def sha(*args, **kwargs):
def file_name(*args, **kwargs):
def image_path(*args, **kwargs):
def last_update_time(*args, **kwargs):
inputs
outputs
def tags(*args, **kwargs):
def rehydrate_config(*args, **kwargs):
def config(self) -> wallaroo.model_config.ModelConfig:
def configure( self, runtime: Optional[str] = None, tensor_fields: List[str] = None, filter_threshold: float = None, input_schema: Optional[pyarrow.lib.Schema] = None, output_schema: Optional[pyarrow.lib.Schema] = None) -> wallaroo.model.Model:
def logs( self, limit: int = 100, valid: Optional[bool] = None) -> wallaroo.logs.LogEntries:
def deploy( self, pipeline_name: str, deployment_config: Optional[wallaroo.deployment_config.DeploymentConfig] = None) -> wallaroo.pipeline.Pipeline:

Convenience function to quickly deploy a Model. It will configure the model, create a pipeline with a single model step, deploy it, and return the pipeline.

Typically, the configure() method is used to configure a model prior to deploying it. However, if a default configuration is sufficient, this function can be used to quickly deploy with said default configuration.

The filename this Model was generated from needs to have a recognizable file extension so that the runtime can be inferred. Currently, this is:

  • .onnx -> ONNX runtime
Parameters
  • str deployment_name: Name of the deployment to create. Must be unique across all deployments. Deployment names must be ASCII alpha-numeric characters plus dash (-) only.
class ModelVersions(typing.List[wallaroo.model.Model]):

Wraps a list of Models for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

14 - wallaroo.model_config

class ModelConfig(wallaroo.object.Object):

Wraps a backend ModelConfig object.

ModelConfig( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
@staticmethod
def as_standalone( model: wallaroo.model.Model, filter_threshold: Optional[float] = None, runtime: Optional[str] = None, tensor_fields: Optional[List[str]] = None) -> wallaroo.model_config.ModelConfig:

Creates a ModelConfig intended for use in generating standalone configurations

inputs
outputs
def to_yaml(self):

Generates a yaml file for standalone engines

def to_k8s_yaml(self):
def id(self) -> int:
def filter_threshold(*args, **kwargs):
def model(*args, **kwargs):
def runtime(*args, **kwargs):
def tensor_fields(*args, **kwargs):

15 - wallaroo.ModelConversion

class ModelConversionInputType(enum.Enum):

An enumeration.

Float16 = <ModelConversionInputType.Float16: 'float16'>
Float32 = <ModelConversionInputType.Float32: 'float32'>
Float64 = <ModelConversionInputType.Float64: 'float64'>
Int16 = <ModelConversionInputType.Int16: 'int16'>
Int32 = <ModelConversionInputType.Int32: 'int32'>
Int64 = <ModelConversionInputType.Int64: 'int64'>
UInt8 = <ModelConversionInputType.UInt8: 'uint8'>
UInt16 = <ModelConversionInputType.UInt16: 'uint16'>
UInt32 = <ModelConversionInputType.UInt32: 'uint32'>
UInt64 = <ModelConversionInputType.UInt64: 'uint64'>
Boolean = <ModelConversionInputType.Boolean: 'bool'>
Double = <ModelConversionInputType.Double: 'double'>
Inherited Members
enum.Enum
name
value
class ConvertKerasArguments(typing.NamedTuple):

ConvertKerasArguments(name, comment, input_type, dimensions)

ConvertKerasArguments( name: str, comment: Optional[str], input_type: wallaroo.ModelConversion.ModelConversionInputType, dimensions: List[Union[NoneType, int, float]])

Create new instance of ConvertKerasArguments(name, comment, input_type, dimensions)

name: str

Alias for field number 0

comment: Optional[str]

Alias for field number 1

input_type: wallaroo.ModelConversion.ModelConversionInputType

Alias for field number 2

dimensions: List[Union[NoneType, int, float]]

Alias for field number 3

def to_dict(self) -> Dict[str, Any]:
Inherited Members
builtins.tuple
index
count
class ConvertSKLearnArguments(typing.NamedTuple):

ConvertSKLearnArguments(name, number_of_columns, input_type, comment)

ConvertSKLearnArguments( name: str, number_of_columns: int, input_type: wallaroo.ModelConversion.ModelConversionInputType, comment: Optional[str])

Create new instance of ConvertSKLearnArguments(name, number_of_columns, input_type, comment)

name: str

Alias for field number 0

number_of_columns: int

Alias for field number 1

input_type: wallaroo.ModelConversion.ModelConversionInputType

Alias for field number 2

comment: Optional[str]

Alias for field number 3

def to_dict(self) -> Dict[str, Any]:
Inherited Members
builtins.tuple
index
count
class ConvertXGBoostArgs(typing.NamedTuple):

ConvertXGBoostArgs(name, number_of_columns, input_type, comment)

ConvertXGBoostArgs( name: str, number_of_columns: int, input_type: wallaroo.ModelConversion.ModelConversionInputType, comment: Optional[str])

Create new instance of ConvertXGBoostArgs(name, number_of_columns, input_type, comment)

name: str

Alias for field number 0

number_of_columns: int

Alias for field number 1

input_type: wallaroo.ModelConversion.ModelConversionInputType

Alias for field number 2

comment: Optional[str]

Alias for field number 3

def to_dict(self) -> Dict[str, Any]:
Inherited Members
builtins.tuple
index
count
class ModelConversionSource(enum.Enum):

An enumeration.

KERAS = <ModelConversionSource.KERAS: 'keras'>
XGBOOST = <ModelConversionSource.XGBOOST: 'xgboost'>
SKLEARN = <ModelConversionSource.SKLEARN: 'sklearn'>
Inherited Members
enum.Enum
name
value
class ModelConversionGenericException(builtins.Exception):

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ModelConversionFailure(builtins.Exception):

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ModelConversionUnsupportedType(builtins.Exception):

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args
class ModelConversionSourceFileNotPresent(builtins.Exception):

Common base class for all non-exit exceptions.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
args

16 - wallaroo.models

class Models(wallaroo.object.Object):

A Wallaroo Model object. Models may have multiple versions, accessed via .versions()

Models( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def name(*args, **kwargs):
def owner_id(*args, **kwargs):
def last_update_time(*args, **kwargs):
def created_at(*args, **kwargs):
def versions(*args, **kwargs):
class ModelsList(typing.List[wallaroo.models.Models]):

Wraps a list of Models for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

17 - wallaroo.notify

class Notification:
Notification()
def to_json(self):
class Email(Notification):
Email(to)
def to_json(self):
@classmethod
def from_json(cls, json):
def to_json(notifications):
def from_json(json):
class AlertConfiguration:
AlertConfiguration(name, expression, notifications)
@classmethod
def from_json(Cls, json):
def to_json(self):

18 - wallaroo.object

class DehydratedValue:

Represents a not-set sentinel value.

Attributes that are null in the database will be returned as None in Python, and we want them to be set as such, so None cannot be used as a sentinel value signaling that an optional attribute is not yet set. Objects of this class fill that role instead.

DehydratedValue()
def rehydrate(attr):

Decorator that rehydrates the named attribute if needed.

This should decorate getter calls for an attribute:

@rehydrate(_foo_attr)
def foo_attr(self):
    return self._foo_attr

This will cause the API object to "rehydrate" (perform a query to fetch and fill in all attributes from the database) if the named attribute is not set.

def value_if_present( data: Dict[str, Any], path: str) -> Union[Any, wallaroo.object.DehydratedValue]:

Returns a value in a nested dictionary, or DehydratedValue.

Parameters
  • str path: Dot-delimited path within a nested dictionary; e.g. foo.bar.baz
Returns

The requested value inside the dictionary, or DehydratedValue if it doesn't exist.

class RequiredAttributeMissing(builtins.Exception):

Raised when an API object is initialized without a required attribute.

RequiredAttributeMissing(class_name: str, attribute_name: str)
Inherited Members
builtins.BaseException
with_traceback
args
class ModelUploadError(builtins.Exception):

Raised when a model file fails to upload.

ModelUploadError(e)
Inherited Members
builtins.BaseException
with_traceback
args
class EntityNotFoundError(builtins.Exception):

Raised when a query for a specific API object returns no results.

This is specifically for queries by unique identifiers that are expected to return exactly one result; queries that can return 0 to many results should return empty list instead of raising this exception.

EntityNotFoundError(entity_type: str, params: Dict[str, str])
Inherited Members
builtins.BaseException
with_traceback
args
class LimitError(builtins.Exception):

Raised when deployment fails.

LimitError(e)
Inherited Members
builtins.BaseException
with_traceback
args
class DeploymentError(builtins.Exception):

Raised when deployment fails.

DeploymentError(e)
Inherited Members
builtins.BaseException
with_traceback
args
class InferenceError(builtins.Exception):

Raised when inference fails.

InferenceError(error: Dict[str, str])
Inherited Members
builtins.BaseException
with_traceback
args
class InvalidNameError(builtins.Exception):

Raised when an entity's name does not meet the expected critieria.

Parameters
  • str name: the name string that is invalid
  • str req: a string description of the requirement
InvalidNameError(name: str, req: str)
Inherited Members
builtins.BaseException
with_traceback
args
class CommunicationError(builtins.Exception):

Raised when some component cannot be contacted. There is a networking, configuration or installation problem.

CommunicationError(e)
Inherited Members
builtins.BaseException
with_traceback
args
class Object(abc.ABC):

Base class for all backend GraphQL API objects.

This class serves as a framework for API objects to be constructed based on a partially-complete JSON response, and to fill in their remaining members dynamically if needed.

Object( gql_client: Optional[gql.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.

19 - wallaroo.pipeline

def update_timestamp(f):
class Pipeline(wallaroo.object.Object):

A pipeline is an execution context for models. Pipelines contain Steps, which are often Models. Pipelines can be deployed or undeployed.

Pipeline(client: Optional[wallaroo.client.Client], data: Dict[str, Any])

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def builder(self) -> wallaroo.pipeline_config.PipelineConfigBuilder:
def id(self) -> int:
def owner_id(*args, **kwargs):
def create_time(*args, **kwargs):
def last_update_time(*args, **kwargs):
def name(*args, **kwargs):
def variants(*args, **kwargs):
def tags(*args, **kwargs):
def logs( self, limit: int = 100, valid: Optional[bool] = None, dataset: Union[Sequence[str], str, NoneType] = None, dataset_exclude: Union[Sequence[str], str, NoneType] = None, dataset_separator: Optional[str] = None, arrow: Optional[bool] = False) -> Union[wallaroo.logs.LogEntries, pyarrow.lib.Table]:
def logs_shadow_deploy(self):
def url(self) -> str:

Returns the inference URL for this pipeline.

def deploy( self, pipeline_name: Optional[str] = None, deployment_config: Optional[wallaroo.deployment_config.DeploymentConfig] = None) -> wallaroo.pipeline.Pipeline:

Deploy pipeline. pipeline_name is optional if deploy was called previously. When specified, pipeline_name must be ASCII alpha-numeric characters, plus dash (-) only.

def definition(self) -> str:

Get the current definition of the pipeline as a string

def get_topic_name(self) -> str:
def undeploy(self) -> wallaroo.pipeline.Pipeline:
def infer(self, *args, **kwargs):

Returns an inference result on this deployment, given a tensor.

Parameters
  • tensor: Union[Dict[str, Any], pd.DataFrame, pa.Table] Inference data. Should be a dictionary. Future improvement: will be a pandas dataframe or arrow table
  • timeout: Optional[Union[int, float]] infer requests will time out after the amount of seconds provided are exceeded. timeout defaults to 15 secs.
  • dataset: Optional[list] By default this is set to return, ["time", "out"]. Other available options "check_failures", "metadata"
  • dataset_exclude: Optional[Union[Sequence[str], str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
Returns

InferenceResult in dictionary, dataframe or arrow format.

def infer_from_file(self, *args, **kwargs):

Returns an inference result on this deployment, given tensors in a file.

async def batch_infer_from_file( self, filename: Union[str, pathlib.Path], data_key: str = 'tensor', batch_size: int = 1000, connector_limit: int = 4) -> List[wallaroo.inference_result.InferenceResult]:

Async method to run batched inference on a data file for a given deployment.

Parameters
  • str filename: path to an existing file with tensor data in JSON format.
  • str data_key: key which the tensor data is under within the JSON. defaults to "tensor".
  • int batch_size: batch size to use when sending requests to the engine. defaults to 1000.
  • int connector_limit: limit for the amount of TCP connections. defaults to 4.
Returns

List of InferenceResult's.

def status(self) -> Dict[str, Any]:

Status of pipeline

def steps(self) -> List[wallaroo.pipeline_config.Step]:

Returns a list of the steps of a pipeline. Not exactly a shim

def model_configs(self) -> List[wallaroo.model_config.ModelConfig]:

Returns a list of the model configs of a pipeline. Not exactly a shim

def remove_step(self, index: int) -> wallaroo.pipeline.Pipeline:

Remove a step at a given index

def add_model_step(self, model: wallaroo.model.Model) -> wallaroo.pipeline.Pipeline:

Perform inference with a single model.

def replace_with_model_step( self, index: int, model: wallaroo.model.Model) -> wallaroo.pipeline.Pipeline:

Replaces the step at the given index with a model step

def add_multi_model_step( self, models: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:

Perform inference on the same input data for any number of models.

def replace_with_multi_model_step( self, index: int, models: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:

Replaces the step at the index with a multi model step

def add_audit(self, slice) -> wallaroo.pipeline.Pipeline:

Run audit logging on a specified slice of model outputs.

The slice must be in python-like format. start:, start:end, and :end are supported.

def replace_with_audit(self, index: int, audit_slice: str) -> wallaroo.pipeline.Pipeline:

Replaces the step at the index with an audit step

def add_select(self, index: int) -> wallaroo.pipeline.Pipeline:

Select only the model output with the given index from an array of outputs.

def replace_with_select(self, step_index: int, select_index: int) -> wallaroo.pipeline.Pipeline:

Replaces the step at the index with a select step

def add_key_split( self, default: wallaroo.model.Model, meta_key: str, options: Dict[str, wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:

Split traffic based on the value at a given meta_key in the input data, routing to the appropriate model.

If the resulting value is a key in options, the corresponding model is used. Otherwise, the default model is used for inference.

def replace_with_key_split( self, index: int, default: wallaroo.model.Model, meta_key: str, options: Dict[str, wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:

Replace the step at the index with a key split step

def add_random_split( self, weighted: Iterable[Tuple[float, wallaroo.model.Model]], hash_key: Optional[str] = None) -> wallaroo.pipeline.Pipeline:

Routes inputs to a single model, randomly chosen from the list of weighted options.

Each model receives inputs that are approximately proportional to the weight it is assigned. For example, with two models having weights 1 and 1, each will receive roughly equal amounts of inference inputs. If the weights were changed to 1 and 2, the models would receive roughly 33% and 66% respectively instead.

When choosing the model to use, a random number between 0.0 and 1.0 is generated. The weighted inputs are mapped to that range, and the random input is then used to select the model to use. For example, for the two-models equal-weight case, a random key of 0.4 would route to the first model. 0.6 would route to the second.

To support consistent assignment to a model, a hash_key can be specified. This must be between 0.0 and 1.0. The value at this key, when present in the input data, will be used instead of a random number for model selection.

def replace_with_random_split( self, index: int, weighted: Iterable[Tuple[float, wallaroo.model.Model]], hash_key: Optional[str] = None) -> wallaroo.pipeline.Pipeline:

Replace the step at the index with a random split step

def add_shadow_deploy( self, champion: wallaroo.model.Model, challengers: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:

Create a "shadow deployment" experiment pipeline. The champion model and all challengers are run for each input. The result data for all models is logged, but the output of the champion is the only result returned.

This is particularly useful for "burn-in" testing a new model with real world data without displacing the currently proven model.

This is currently implemented as three steps: A multi model step, an audit step, and a select step. To remove or replace this step, you need to remove or replace all three. You can remove steps using pipeline.remove_step

def replace_with_shadow_deploy( self, index: int, champion: wallaroo.model.Model, challengers: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline.Pipeline:

Replace a given step with a shadow deployment

def add_validation( self, name: str, validation: wallaroo.checks.Expression) -> wallaroo.pipeline.Pipeline:

Add a validation with the given name. All validations are run on all outputs, and all failures are logged.

def add_alert( self, name: str, alert: wallaroo.checks.Alert, notifications: List[wallaroo.notify.Notification]) -> wallaroo.pipeline.Pipeline:
def replace_with_alert( self, index: int, name: str, alert: wallaroo.checks.Alert, notifications: List[wallaroo.notify.Notification]) -> wallaroo.pipeline.Pipeline:

Replace the step at the given index with the specified alert

def clear(self) -> wallaroo.pipeline.Pipeline:

Remove all steps from the pipeline. This might be desireable if replacing models, for example.

def list_explainability_configs(self) -> List[wallaroo.explainability.ExplainabilityConfig]:

List the explainability configs we've created.

def get_explainability_config( self, expr: Union[str, wallaroo.explainability.ExplainabilityConfig]) -> wallaroo.explainability.ExplainabilityConfig:

Get the details of an explainability config.

def create_explainability_config(self, feature_names: Sequence[str], num_points=10):

Create a shap config to be used later for reference and adhoc requests.

class Pipelines(typing.List[wallaroo.pipeline.Pipeline]):

Wraps a list of pipelines for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

20 - wallaroo.pipeline_config

class ValidDataType(builtins.str, enum.Enum):

An enumeration.

f32 = <ValidDataType.f32: 'f32'>
f64 = <ValidDataType.f64: 'f64'>
i8 = <ValidDataType.i8: 'i8'>
u8 = <ValidDataType.u8: 'u8'>
i16 = <ValidDataType.i16: 'i16'>
u16 = <ValidDataType.u16: 'u16'>
i32 = <ValidDataType.i32: 'i32'>
u32 = <ValidDataType.u32: 'u32'>
i64 = <ValidDataType.i64: 'i64'>
u64 = <ValidDataType.u64: 'u64'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class ModelConfigsForStep:
ModelConfigsForStep(model_configs: List[wallaroo.model_config.ModelConfig])
class ModelForStep:
ModelForStep(name, version, sha)
def to_json(self):
@classmethod
def from_json(cls, json_dict: Dict[str, str]):
@classmethod
def from_model(cls, model: wallaroo.model.Model):
class ModelWeight:
ModelWeight(weight: float, model: wallaroo.pipeline_config.ModelForStep)
def to_json(self):
@classmethod
def from_json(cls, json_dict: Dict[str, Any]):
@classmethod
def from_tuple(cls, tup: Tuple[float, wallaroo.model.Model]):
class RowToModel:
RowToModel(row_index: int, model: wallaroo.pipeline_config.ModelForStep)
def to_json(self):
@classmethod
def from_json(cls, json_dict: Dict[str, Any]):
class Step:
Step()
def to_json(self):
def is_inference_step(self):
@staticmethod
def from_json(json_dict: Dict):
class Average(Step):
Average()
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
Inherited Members
Step
is_inference_step
class AuditResults(Step):
AuditResults(start: int, end: Optional[int] = None)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
Inherited Members
Step
is_inference_step
class Check(Step):
Check(tree: str)
def to_json(self):
@classmethod
def from_name_and_validation( cls, name: str, validation: wallaroo.checks.Expression, gauges: List[str] = []):
@staticmethod
def from_json(json_dict: Dict):
Inherited Members
Step
is_inference_step
class ColumnsSelect(Step):
ColumnsSelect(columns: List[int])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
Inherited Members
Step
is_inference_step
class ColumnsToRows(Step):
ColumnsToRows()
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
Inherited Members
Step
is_inference_step
class InputDataToType(Step):
InputDataToType(data_type: wallaroo.pipeline_config.ValidDataType)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
Inherited Members
Step
is_inference_step
class ModelInference(Step):
ModelInference(models: List[wallaroo.pipeline_config.ModelForStep])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
class RowsToModels(Step):
RowsToModels(rows_to_models: List[wallaroo.pipeline_config.RowToModel])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
class Nth(Step):
Nth(index: int)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
Inherited Members
Step
is_inference_step
class MultiOut(Step):
MultiOut()
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
Inherited Members
Step
is_inference_step
class MetaValueSplit(Step):
MetaValueSplit( split_key: str, control: wallaroo.pipeline_config.ModelForStep, routes: Dict[str, wallaroo.pipeline_config.ModelForStep])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
class RandomSplit(Step):
RandomSplit( weights: List[wallaroo.pipeline_config.ModelWeight], hash_key: Optional[str] = None)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
class PipelineConfig:
PipelineConfig( pipeline_name: str, steps: Iterable[wallaroo.pipeline_config.Step], alert_configurations: Iterable[wallaroo.notify.AlertConfiguration])
@classmethod
def from_json(Klass, json):
def to_json(self):
def to_yaml(self):
class PipelineConfigBuilder:
PipelineConfigBuilder( client: Optional[wallaroo.client.Client], pipeline_name: str, standalone=False)
@staticmethod
def as_standalone(pipeline_name: str):
def config(self) -> wallaroo.pipeline_config.PipelineConfig:
def upload(self) -> wallaroo.pipeline.Pipeline:
def remove_step(self, index: int):

Remove a step at a given index

def add_model_step( self, model: wallaroo.model.Model) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Perform inference with a single model.

def replace_with_model_step( self, index: int, model: wallaroo.model.Model) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the given index with a model step

def add_multi_model_step( self, models: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Perform inference on the same input data for any number of models.

def replace_with_multi_model_step( self, index: int, models: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the index with a multi model step

def add_audit(self, audit_slice: str) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Run audit logging on a specified slice of model outputs.

The slice must be in python-like format. start:, start:end, and :end are supported.

def replace_with_audit( self, index: int, audit_slice: str) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the index with an audit step

def add_select(self, index: int) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Select only the model output with the given index from an array of outputs.

def add_multi_out(self):
def replace_with_select( self, step_index: int, select_index: int) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the index with a select step

def add_key_split( self, default: wallaroo.model.Model, meta_key: str, options: Dict[str, wallaroo.model.Model]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Split traffic based on the value at a given meta_key in the input data, routing to the appropriate model.

If the resulting value is a key in options, the corresponding model is used. Otherwise, the default model is used for inference.

def replace_with_key_split( self, index: int, default: wallaroo.model.Model, meta_key: str, options: Dict[str, wallaroo.model.Model]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replace the step at the index with a key split step

def add_random_split( self, weighted: Iterable[Tuple[float, wallaroo.model.Model]], hash_key: Optional[str] = None) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Routes inputs to a single model, randomly chosen from the list of weighted options.

Each model receives inputs that are approximately proportional to the weight it is assigned. For example, with two models having weights 1 and 1, each will receive roughly equal amounts of inference inputs. If the weights were changed to 1 and 2, the models would receive roughly 33% and 66% respectively instead.

When choosing the model to use, a random number between 0.0 and 1.0 is generated. The weighted inputs are mapped to that range, and the random input is then used to select the model to use. For example, for the two-models equal-weight case, a random key of 0.4 would route to the first model. 0.6 would route to the second.

To support consistent assignment to a model, a hash_key can be specified. This must be between 0.0 and 1.0. The value at this key, when present in the input data, will be used instead of a random number for model selection.

def replace_with_random_split( self, index: int, weighted: Iterable[Tuple[float, wallaroo.model.Model]], hash_key: Optional[str] = None) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replace the step at the index with a random split step

def add_shadow_deploy( self, champion: wallaroo.model.Model, challengers: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Create a "shadow deployment" experiment pipeline. The champion model and all challengers are run for each input. The result data for all models is logged, but the output of the champion is the only result returned.

This is particularly useful for "burn-in" testing a new model with real world data without displacing the currently proven model.

This is currently implemented as three steps: A multi model step, an audit step, and a select step. To remove or replace this step, you need to remove or replace all three. You can remove steps using pipeline.remove_step

def replace_with_shadow_deploy( self, index: int, champion: wallaroo.model.Model, challengers: Iterable[wallaroo.model.Model]) -> wallaroo.pipeline_config.PipelineConfigBuilder:
def add_validation( self, name: str, validation: wallaroo.checks.Expression) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Add a validation with the given name. All validations are run on all outputs, and all failures are logged.

def replace_with_validation( self, index: int, name: str, validation: wallaroo.checks.Expression) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replace the step at the given index with a validation step

def add_alert( self, name: str, alert: wallaroo.checks.Alert, notifications: List[wallaroo.notify.Notification]) -> wallaroo.pipeline_config.PipelineConfigBuilder:
def replace_with_alert( self, index, name: str, alert: wallaroo.checks.Alert, notifications: List[wallaroo.notify.Notification]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replace the step at the given index with the specified alert

def clear(self) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Remove all steps from the pipeline. This might be desireable if replacing models, for example.

21 - wallaroo.pipeline_variant

class PipelineVariant(wallaroo.object.Object):

Base class for all backend GraphQL API objects.

This class serves as a framework for API objects to be constructed based on a partially-complete JSON response, and to fill in their remaining members dynamically if needed.

PipelineVariant(client: Optional[wallaroo.client.Client], data: Dict[str, Any])

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def create_time(*args, **kwargs):
def last_update_time(*args, **kwargs):
def name(*args, **kwargs):
def definition(*args, **kwargs):
def pipeline(*args, **kwargs):
def deployments(*args, **kwargs):
def model_configs(*args, **kwargs):
def deploy( self, deployment_name: str, model_configs: List[wallaroo.model_config.ModelConfig], config: Optional[wallaroo.deployment_config.DeploymentConfig] = None) -> wallaroo.deployment.Deployment:

Deploys this PipelineVariant.

Parameters
  • str deployment_name: Name of the new Deployment. Must be unique across all deployments.
  • List[ModelConfig] model_configs: List of the configured models to use. These must be the same ModelConfigs used when creating the Pipeline.
  • Optional[DeploymentConfig] config: Deployment configuration to use.
Returns

A Deployment object for the resulting deployment.

class PipelineVariants(typing.List[wallaroo.pipeline_variant.PipelineVariant]):

Wraps a list of pipelines for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

22 - wallaroo.queries

def named(name: str) -> str:

23 - wallaroo.standalone_client

class StandaloneClient:
StandaloneClient( host: str, port: int, model: Optional[wallaroo.model.Model] = None, pipeline_config: Optional[wallaroo.pipeline_config.PipelineConfig] = None, interactive: Optional[bool] = None)
def status(self) -> Dict[str, Any]:

Returns a dict of standalone engine model status.

Example: {'models': [{'class': 'ccfraud', 'name': 'z5', 'status': 'Running'}]}

Example: {'models': [{'class': 'postprocess', 'name': 'first-postprocess', 'status': 'Running'}, {'class': 'noopfloats', 'name': 'noopv1', 'status': 'Running'}, {'class': 'preprocess', 'name': 'first-preprocess', 'status': 'Running'}]}

Example: {"models": [{"class":"synerror", "name":"v1", "status":{"Error":"Python compile or runtime error [" File \"//tmp/.tmpk7mJpI/syntax_error.py\", line 1", " PLEASE CRASH HERE", " ^", "SyntaxError: invalid syntax"]"}}]}

def infer( self, tensor: Dict[str, Any]) -> List[wallaroo.inference_result.InferenceResult]:
def infer_from_file( self, filename: Union[str, pathlib.Path]) -> List[wallaroo.inference_result.InferenceResult]:

24 - wallaroo.tag

class Tag(wallaroo.object.Object):

Tags that may be attached to models and pipelines.

Tag( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def tag(*args, **kwargs):
def model_tags(*args, **kwargs):
def pipeline_tags(*args, **kwargs):
def list_models(self) -> List[wallaroo.model.Model]:

Lists the models this tag is on.

def add_to_model(self, model_id: int):
def remove_from_model(self, model_id: int):
def list_pipelines(self) -> List[wallaroo.pipeline.Pipeline]:

Lists the pipelines this tag is on.

def add_to_pipeline(self, pipeline_id: int):
def remove_from_pipeline(self, pipeline_id: int):
class Tags(typing.List[wallaroo.tag.Tag]):

Wraps a list of tags for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

25 - wallaroo.unwrap

def unwrap(v: Optional[~T]) -> ~T:

Simple function to placate pylance

26 - wallaroo.user

class User:

A platform User.

User(client, data: Dict[str, Any], standalone=False)
def id(self) -> str:
def email(self) -> str:
def username(self) -> str:
def enabled(self) -> bool:
@staticmethod
def list_users( auth, api_endpoint: str = 'http://api-lb:8080', auth_endpoint: str = 'http://api-lb:8080'):
@staticmethod
def invite_user( email, password, auth, api_endpoint: str = 'http://api-lb:8080', auth_endpoint: str = 'http://api-lb:8080'):

27 - wallaroo.user_type

class UserType(builtins.str, enum.Enum):

Represents a workspace user's role.

OWNER = <UserType.OWNER: 'owner'>
COLLABORATOR = <UserType.COLLABORATOR: 'collaborator'>
@staticmethod
def from_str(label: str):

Creates a UserType from a str

Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans

28 - wallaroo.workspace

class Workspace(wallaroo.object.Object):

Workspace provides a user and visibility context for access to models and pipelines.

Workspace( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def to_json(self):
def id(self) -> int:
def name(*args, **kwargs):
def archived(*args, **kwargs):
def created_at(*args, **kwargs):
def created_by(*args, **kwargs):
def models(*args, **kwargs):

Returns a List of Models objects that have been created in this workspace.

def pipelines(*args, **kwargs):
def users(*args, **kwargs):
def add_user(self, user_email: str) -> wallaroo.workspace.Workspace:

Add a user to workspace as participant

def add_owner(self, user_email: str) -> wallaroo.workspace.Workspace:

Add a user to workspace as owner

def remove_user(self, user_email: str):
class Workspaces(typing.List[wallaroo.workspace.Workspace]):

Wraps a list of workspaces for display.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort