1 - wallaroo.assay

An Assay represents a record in the database. An assay contains some high level attributes such as name, status, active, etc. as well as the sub objects Baseline, Window and Summarizer which specify how the Baseline is derived, how the Windows should be created and how the analysis should be conducted.

Assay(client: Optional[wallaroo.client.Client], data: Dict[str, Any])

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def turn_on(self):

Sets the Assay to active causing it to run and backfill any missing analysis.

def turn_off(self):

Disables the Assay. No further analysis will be conducted until the assay is enabled.

def set_alert_threshold(self, threshold: float):

Sets the alert threshold at the specified level. The status in the AssayAnalysis will show if this level is exceeded however currently alerting/notifications are not implemented.

def set_warning_threshold(self, threshold: float):

Sets the warning threshold at the specified level. The status in the AssayAnalysis will show if this level is exceeded however currently alerting/notifications are not implemented.

def meta_df(assay_result: Dict, index_name) -> pandas.core.frame.DataFrame:

Creates a dataframe for the meta data in the baseline or window excluding the edge information.

Parameters
  • assay_result: The dict of the raw asset result
def edge_df(window_or_baseline: Dict) -> pandas.core.frame.DataFrame:

Creates a dataframe specifically for the edge information in the baseline or window.

Parameters
  • window_or_baseline: The dict from the assay result of either the window or baseline
class AssayAnalysis:

The AssayAnalysis class helps handle the assay analysis logs from the Plateau logs. These logs are a json document with meta information on the assay and analysis as well as summary information on the baseline and window and information on the comparison between them.

AssayAnalysis(raw: Dict[str, Any])
def compare_basic_stats(self) -> pandas.core.frame.DataFrame:

Creates a simple dataframe making it easy to compare a baseline and window.

def baseline_stats(self) -> pandas.core.frame.DataFrame:

Creates a simple dataframe with the basic stats data for a baseline.

def compare_bins(self) -> pandas.core.frame.DataFrame:

Creates a simple dataframe to compare the bin/edge information of baseline and window.

def baseline_bins(self) -> pandas.core.frame.DataFrame:

Creates a simple dataframe to with the edge/bin data for a baseline.

def chart(self, show_scores=True):

Quickly create a chart showing the bins, values and scores of an assay analysis. show_scores will also label each bin with its final weighted (if specified) score.

class AssayAnalysisList:

Helper class primarily to easily create a dataframe from a list of AssayAnalysis objects.

AssayAnalysisList(raw: List[wallaroo.assay.AssayAnalysis])
def to_dataframe(self) -> pandas.core.frame.DataFrame:

Creates and returns a summary dataframe from the assay results.

def to_full_dataframe(self) -> pandas.core.frame.DataFrame:

Creates and returns a dataframe with all values including inputs and outputs from the assay results.

def chart_df( self, df: Union[pandas.core.frame.DataFrame, pandas.core.series.Series], title: str, nth_x_tick=None):

Creates a basic chart of the scores from dataframe created from assay analysis list

def chart_scores(self, title: Optional[str] = None, nth_x_tick=4):

Creates a basic chart of the scores from an AssayAnalysisList

def chart_iopaths( self, labels: Optional[List[str]] = None, selected_labels: Optional[List[str]] = None, nth_x_tick=None):

Creates a basic charts of the scores for each unique iopath of an AssayAnalysisList

class Assays(typing.List[wallaroo.assay.Assay]):

Wraps a list of assays for display in an HTML display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

2 - wallaroo.assay_config

def unwrap(v: Optional[~T]) -> ~T:

Simple function to placate pylance

class BinMode(builtins.str, enum.Enum):

How should we calculate the bins. NONE - no bins. Only useful if we only care about the mean, median, etc. EQUAL - evenly spaced bins: min - max / num_bins QUANTILE - based on percentages. If num_bins is 5 then quintiles so bins are created at the 20%, 40%, 60%, 80% and 100% points. PROVIDED - user provides the edge points for the bins.

NONE = <BinMode.NONE: 'None'>
EQUAL = <BinMode.EQUAL: 'Equal'>
QUANTILE = <BinMode.QUANTILE: 'Quantile'>
PROVIDED = <BinMode.PROVIDED: 'Provided'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Aggregation(builtins.str, enum.Enum):

What we use to calculate the score. EDGES - distnces between the edges. DENSITY - percentage of values that fall in each bin. CUMULATIVE - cumulative percentage that fall in the bins.

EDGES = <Aggregation.EDGES: 'Edges'>
DENSITY = <Aggregation.DENSITY: 'Density'>
CUMULATIVE = <Aggregation.CUMULATIVE: 'Cumulative'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class Metric(builtins.str, enum.Enum):

How we calculate the score. MAXDIFF - maximum difference between corresponding bins. SUMDIFF - sum of differences between corresponding bins. PSI - Population Stability Index

MAXDIFF = <Metric.MAXDIFF: 'MaxDiff'>
SUMDIFF = <Metric.SUMDIFF: 'SumDiff'>
PSI = <Metric.PSI: 'PSI'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class SummarizerConfig:

The summarizer specifies how the bins of the baseline and window should be compared.

SummarizerConfig()
def to_json(self) -> str:
class UnivariateContinousSummarizerConfig(SummarizerConfig):

The UnivariateContinousSummarizer analyizes one input or output feature (Univariate) at a time. Expects the values to be continous or at least numerous enough to fall in various/all the bins.

UnivariateContinousSummarizerConfig( bin_mode: wallaroo.assay_config.BinMode, aggregation: wallaroo.assay_config.Aggregation, metric: wallaroo.assay_config.Metric, num_bins: int, bin_weights: Optional[List[float]] = None, bin_width: Optional[float] = None, provided_edges: Optional[List[float]] = None, add_outlier_edges: bool = True)
Inherited Members
class SummarizerBuilder(abc.ABC):

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def build(self) -> wallaroo.assay_config.SummarizerConfig:
class UnivariateContinousSummarizerBuilder(SummarizerBuilder):

Builds the UnviariateSummarizer

UnivariateContinousSummarizerBuilder()
def add_bin_mode( self, bin_mode: wallaroo.assay_config.BinMode, edges: Optional[List[float]] = None):

Sets the binning mode. If BinMode.PROVIDED is specified a list of edges is also required.

def add_num_bins(self, num_bins: int):

Sets the number of bins. If weights have been previously set they must be set to none to allow changing the number of bins.

def add_bin_weights(self, weights: Optional[List[float]]):

Specifies the weighting to be given to the bins. The number of weights must be 2 larger than the number of bins to accomodate outliers smaller and outliers larger than values seen in the baseline. The passed in values can be whole or real numbers and do not need to add up to 1 or any other specific value as they will be normalized during the score calculation phase. The weights passed in can be none to remove previously specified weights and to allow changing of the number of bins.

def add_metric(self, metric: wallaroo.assay_config.Metric):

Sets the metric mode.

def add_aggregation(self, aggregation: wallaroo.assay_config.Aggregation):

Sets the aggregation style.

def add_bin_edges(self, edges: Optional[List[float]]):

Specifies the right hand side (max value) of the bins. The number of edges must be equal to or one more than the number of bins. When equal to the number of bins the edge for the left outlier bin is calculated from the baseline. When an additional edge (one more than number of bins) that first (lower) value is used as the max value for the left outlier bin. The max value for the right hand outlier bin is always Float MAX.

class BaselineConfig:

Abstract base class for Baseline config objects. Currently only CalculatedBaseline (fixed window) and StaticBaseline are implemented.

BaselineConfig()
def to_json(self) -> str:
class CalculatedBaseline(BaselineConfig):

The CalculatedBaseline is calculated from the inferences from a specific time window.

CalculatedBaseline( pipeline_name: str, model_name: str, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, locations: List[str] = [])
Inherited Members
class FixedBaseline(CalculatedBaseline):

The FixedBaseline is calculated from the inferences from a specific time window.

FixedBaseline( pipeline_name: str, model_name: str, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, locations: List[str] = [])
Inherited Members
class StaticBaseline(BaselineConfig):

The StaticBaseline is pre-calculated data from the inferences in a specific time window.

StaticBaseline( count: int, min_: float, max_: float, mean: float, median: float, std: float, edges: List[float], edge_names: List[str], aggregated_values: List[float], aggregation: wallaroo.assay_config.Aggregation, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None)
Inherited Members
class BaselineBuilder(abc.ABC):

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def build(self) -> wallaroo.assay_config.BaselineConfig:
def to_json(self) -> str:
def ensure_tz(d: datetime.datetime) -> datetime.datetime:

Ensure the date it tz aware. If naive assume it is in utc.

class VectorBaselineBuilder(BaselineBuilder):

Helps create a config object for a VectorBaseline.

VectorBaselineBuilder(client: wallaroo.client.Client)
def add_baseline_data( self, baseline_data: numpy.ndarray) -> wallaroo.assay_config.VectorBaselineBuilder:

Add the baseline data.

Add the summarizer.

Create the StaticBaseline object.

Inherited Members
class FixedWindowBaselineBuilder(BaselineBuilder):

Helps to easily create the config object for a FixedBaseline.

FixedWindowBaselineBuilder(client: wallaroo.client.Client, pipeline_name: str)
def add_model_name( self, model_name: str) -> wallaroo.assay_config.FixedWindowBaselineBuilder:

Specify the model to use in the baseline

def add_start( self, start: datetime.datetime) -> wallaroo.assay_config.FixedWindowBaselineBuilder:

Specify the start of the window for the baseline

def add_end( self, end: datetime.datetime) -> wallaroo.assay_config.FixedWindowBaselineBuilder:

Specify the end of the window for the baseline

def add_iopath(self, iopath: str) -> wallaroo.assay_config.FixedWindowBaselineBuilder:

Specify the path to the inference data

def add_location_filter( self, locations: List[str]) -> wallaroo.assay_config.FixedWindowBaselineBuilder:
def add_workspace_id( self, workspace_id: int) -> wallaroo.assay_config.FixedWindowBaselineBuilder:

Specify the workspace id for the inference data

Specify the summarizer to use

Create the FixedBaseline object.

Inherited Members
class WindowConfig:

Configures a window to be compared against the baseline.

WindowConfig( pipeline_name: str, model_name: str, width: str, start: Optional[datetime.datetime] = None, interval: Optional[str] = None, path: Optional[str] = None, workspace_id: Optional[int] = None, locations: List[str] = [])
def to_json(self) -> str:
class WindowBuilder:

Helps build a WindowConfig. model and width are required but there are no good default values for them because they depend on the baseline. We leave it up to the assay builder to configure the window correctly after it is created.

WindowBuilder(pipeline_name: str)
def add_model_name(self, model_name: str):

The model name (model_id) that the window should analyze.

def add_width(self, **kwargs: int):

The width of the window to use when collecting data for analysis.

def add_interval(self, **kwargs: int):

The width of the window to use when collecting data for analysis.

def add_location_filter(self, locations: List[str] = []):
def add_start(self, start: datetime.datetime):
def add_path(self, path: str):
def add_workspace_id(self, workspace_id: int):
def ConfigEncoder(o):

Used to format datetimes as we need when encoding to JSON

class AssayConfig:

Configuration for an Assay record.

AssayConfig( client: Optional[wallaroo.client.Client], name: str, pipeline_id: int, pipeline_name: str, active: bool, status: str, baseline: wallaroo.assay_config.BaselineConfig, window: wallaroo.assay_config.WindowConfig, summarizer: wallaroo.assay_config.SummarizerConfig, warning_threshold: Optional[float], alert_threshold: float, run_until: Optional[datetime.datetime], workspace_id: Optional[int])
def to_json(self) -> str:
def interactive_run(self) -> wallaroo.assay.AssayAnalysisList:

Runs this assay interactively. The assay is not saved to the database nor are analyis records saved to a Plateau topic. Useful for exploring pipeline inference data and experimenting with thresholds.

def interactive_baseline_run(self) -> Optional[wallaroo.assay.AssayAnalysis]:
def interactive_input_run_arrow( self, inferences: pandas.core.frame.DataFrame, labels: Optional[List[str]]) -> wallaroo.assay.AssayAnalysisList:
def interactive_input_run_legacy( self, inferences: List[Dict], labels: Optional[List[str]]) -> wallaroo.assay.AssayAnalysisList:
def interactive_input_run( self, inferences: Union[List[Dict], pandas.core.frame.DataFrame], labels: Optional[List[str]]) -> wallaroo.assay.AssayAnalysisList:

Analyzes the inputs given to create an interactive run for each feature column. The assay is not saved to the database nor are analyis records saved to a Plateau topic. Usefull for exploring inputs for possible causes when a difference is detected in the output.

class AssayBuilder:

Helps build an AssayConfig

AssayBuilder( client: wallaroo.client.Client, name: str, pipeline_id: int, pipeline_name: str, model_name: str, iopath: str, baseline_start: Optional[datetime.datetime] = None, baseline_end: Optional[datetime.datetime] = None, baseline_data: Optional[numpy.ndarray] = None)
def baseline_dataframe(self):
def baseline_histogram( self, bins: Union[int, str, NoneType] = None, log_scale: bool = False):
def baseline_kde(self, log_scale: bool = False):
def baseline_ecdf(self, log_scale: bool = False):
def upload(self) -> int:
def add_name(self, name: str):

Specify the assay name

def add_active(self, active: bool):

Specify if the assay is active or not

def add_iopath(self, iopath: str):

Specify what the assay should analyze. Should start with input or output and have indexes (zero based) into row and column: For example 'input 0 1' specifies the second column of the first input.

def add_location_filter(self, locations: List[str]):
def fixed_baseline_builder(self):

Specify creates a fixed baseline builder for this assay builder.

def add_baseline(self, baseline: wallaroo.assay_config.BaselineConfig):

Adds a specific baseline created elsewhere.

def window_builder(self):

Returns this assay builders window builder.

def add_window(self, window: wallaroo.assay_config.WindowConfig):

Adds a window created elsewhere.

def univariate_continuous_summarizer(self) -> wallaroo.assay_config.UnivariateContinousSummarizerBuilder:

Creates and adds an UCS to this assay builder.

def add_summarizer(self, summarizer: wallaroo.assay_config.SummarizerConfig):

Adds the summarizer created elsewhere to this builder.

def add_warning_threshold(self, warning_threshold: float):

Specify the warning threshold for this assay.

def add_alert_threshold(self, alert_threshold: float):

Specify the alert threshold for this assay.

def add_run_until(self, run_until: datetime.datetime):

"How long should this assay run. Primarily useful for interactive runs to limit the number of analysis.

def calc_bins(num_samples: int, bins: Union[int, str, NoneType]) -> Union[str, int]:

If the users specifies a number of bins or a strategy for calculating it use that. Else us the min of the square root or 50.

3 - wallaroo.auth

Handles authentication to the Wallaroo platform.

Performs a "device code"-style OAuth login flow.

The code is organized as follows:

  • Auth objects returned by create() should be placed on each request to platform APIs. Currently, we have the following types:

    • NoAuth: Does not modify requests
    • PlatformAuth: Places Authorization: Bearer XXX headers on each outgoing request
  • Objects derived from TokenFetcher know how to obtain an AccessToken from a particular provider:

    • KeycloakTokenFetcher: Fetches a token from Keycloak using a device code login flow
    • CachedTokenFetcher: Wraps another TokenFetcher and caches the value to a JSON file to reduce the number of user logins needed.
class AuthType(enum.Enum):

Defines all the supported auth types.

Handles conversions from string names to enum values.

NONE = <AuthType.NONE: 'none'>
SSO = <AuthType.SSO: 'sso'>
USER_PASSWORD = <AuthType.USER_PASSWORD: 'user_password'>
TEST_AUTH = <AuthType.TEST_AUTH: 'test_auth'>
TOKEN = <AuthType.TOKEN: 'token'>
ORCH = <AuthType.ORCH: 'orch'>
Inherited Members
enum.Enum
name
value
class TokenData(typing.NamedTuple):

TokenData(token, user_email, user_id)

TokenData(token: str, user_email: str, user_id: str)

Create new instance of TokenData(token, user_email, user_id)

token: str

Alias for field number 0

user_email: str

Alias for field number 1

user_id: str

Alias for field number 2

def to_dict(self) -> Dict[str, str]:
Inherited Members
builtins.tuple
index
count
def create( keycloak_addr: str, auth_type: Union[wallaroo.auth.AuthType, str, NoneType]) -> wallaroo.auth._WallarooAuth:

Returns an auth object of the corresponding type.

Parameters
  • str keycloak_addr: Address of the Keycloak instance to auth against
  • AuthType or str auth_type: Type of authentication to use
Returns

Auth object that can be passed to all requests calls

Raises
  • NotImplementedError: if auth_type is not recognized
def logout():

Removes cached values for all third-party auth providers.

This will not invalidate auth objects already created with create().

class AuthError(builtins.Exception):

Base type for all errors in this module.

AuthError(message: str, code: Optional[int] = None)
Inherited Members
builtins.BaseException
with_traceback
args
class TokenFetchError(AuthError):

Errors encountered while performing a login.

Inherited Members
builtins.BaseException
with_traceback
args
class TokenRefreshError(AuthError):

Errors encountered while refreshing an AccessToken.

Inherited Members
builtins.BaseException
with_traceback
args

4 - wallaroo.checks

class Expression:

Root base class for all model-checker expressions. Provides pythonic magic-method sugar for expression definitions.

Expression()
def model_names(self):
def as_json(self):
def one_of(self, *values):
@classmethod
def from_py(cls, value):

Creates an :py:Expression: from a given python value.

def top_json(self) -> Dict[str, object]:

Creates a top-level expression that can be passed to the model checker runtime.

class Function(Expression):

Root base class for all model-checker expressions. Provides pythonic magic-method sugar for expression definitions.

Function(op, args)
def model_names(self):
def as_json(self):
class BinOp(Expression):

Root base class for all model-checker expressions. Provides pythonic magic-method sugar for expression definitions.

BinOp(op, left, right)
def model_names(self):
def as_json(self):
class Variable(Expression):

Declares a model variable that can be used as an :py:Expression: in the model checker. Variables are identified by their model_name, a position of either "input" or "output", and the tensor index.

Variable(model_name, position, index)
def model_names(self):
def as_json(self):
def value_to_node(value):
class Value(Expression):

Root base class for all model-checker expressions. Provides pythonic magic-method sugar for expression definitions.

Value(value)
def model_names(self):
def as_json(self):
def is_prom_primitive(v):
class Aggregate:
Aggregate( name: str, promql_agg: str, inner_expression: wallaroo.checks.Expression, duration: datetime.timedelta, bucket_size: Optional[datetime.timedelta])
def expression(self):
def promql(self, gauge_name):
class Alert:
Alert(op, left, right)
def promql(self, gauge_name):
class DefinedFunction:
DefinedFunction(name)
class DefinedAggregate:
DefinedAggregate(name: str, promql_agg)
class Variables:
Variables(model, position)
def instrument( values: Dict[str, wallaroo.checks.Expression], gauges: List[str], validations: List[str]):
def dns_compliant(name: str):

Returns true if a string is compliant with DNS label name requirement to ensure it can be a part of a full DNS host name

def require_dns_compliance(name: str):

Validates that 'name' complies with DNS naming requirements or raises an exception

5 - wallaroo.client

class Client:

Client handle to a Wallaroo platform instance.

Objects of this class serve as the entrypoint to Wallaroo platform functionality.

Client( api_endpoint: str = 'http://api-lb:8080', auth_endpoint: str = '', request_timeout: Optional[int] = None, auth_type: Optional[str] = None, gql_client: Optional[gql.client.Client] = None, interactive: Optional[bool] = None, time_format: str = '%Y-%d-%b %H:%M:%S')

Create a Client handle.

Parameters
  • str api_endpoint: Host/port of the platform API endpoint
  • str auth_endpoint: Host/port of the platform Keycloak instance
  • int timeout: Max timeout of web requests, in seconds
  • str auth_type: Authentication type to use. Can be one of: "none", "sso", "user_password".
  • bool interactive: If provided and True, some calls will print additional human information, or won't when False. If not provided, interactive defaults to True if running inside Jupyter and False otherwise.
  • str time_format: Preferred strftime format string for displaying timestamps in a human context.
@staticmethod
def get_urls( auth_type: Optional[str], api_endpoint: str, auth_endpoint: str) -> Tuple[Optional[str], str, str]:

Method to calculate the auth values specified as defaults, as params or in ENV vars. Made static to be testable without reaching out to SSO, etc.

def list_tags(self) -> wallaroo.tag.Tags:

List all tags on the platform.

Returns

A list of all tags on the platform.

def list_models(self) -> wallaroo.model.ModelList:

List all models on the platform.

Returns

A list of all models on the platform.

def list_deployments(self) -> List[wallaroo.deployment.Deployment]:

List all deployments (active or not) on the platform.

Returns

A list of all deployments on the platform.

def search_pipelines( self, search_term: Optional[str] = None, deployed: Optional[bool] = None, created_start: Optional[wallaroo.client.Datetime] = None, created_end: Optional[wallaroo.client.Datetime] = None, updated_start: Optional[wallaroo.client.Datetime] = None, updated_end: Optional[wallaroo.client.Datetime] = None) -> wallaroo.pipeline_version.PipelineVersionList:

Search for pipelines. All parameters are optional, in which case the result is the same as list_pipelines(). All times are strings to be parsed by datetime.isoformat. Example:

 myclient.search_pipelines(created_end='2022-04-19 13:17:59+00:00', search_term="foo")
Parameters
  • str search_term: Will be matched against tags and model names. Example: "footag123".
  • bool deployed: Pipeline was deployed or not
  • str created_start: Pipeline was created at or after this time
  • str created_end: Pipeline was created at or before this time
  • str updated_start: Pipeline was updated at or before this time
  • str updated_end: Pipeline was updated at or before this time
Returns

A list of pipeline versions matching the search criteria.

def search_pipeline_versions( self, search_term: Optional[str] = None, deployed: Optional[bool] = None, created_start: Optional[wallaroo.client.Datetime] = None, created_end: Optional[wallaroo.client.Datetime] = None, updated_start: Optional[wallaroo.client.Datetime] = None, updated_end: Optional[wallaroo.client.Datetime] = None) -> wallaroo.pipeline_version.PipelineVersionList:

Search for pipeline versions. All parameters are optional. All times are strings to be parsed by datetime.isoformat. Example:

myclient.search_pipeline_versions(created_end='2022-04-19 13:17:59+00:00', search_term="foo")

Parameters
  • str search_term: Will be matched against tags and model names. Example: "footag123".
  • bool deployed: Pipeline was deployed or not
  • str created_start: Pipeline was created at or after this time
  • str created_end: Pipeline was created at or before this time
  • str updated_start: Pipeline was updated at or before this time
  • str updated_end: Pipeline was updated at or before this time
Returns

A list of pipeline versions matching the search criteria.

def search_my_models( self, search_term: Optional[str] = None, uploaded_time_start: Optional[wallaroo.client.Datetime] = None, uploaded_time_end: Optional[wallaroo.client.Datetime] = None) -> wallaroo.model_version.ModelVersionList:

Search models owned by you. Example:

client.search_my_models(search_term="my_model")

Parameters
  • search_term: Searches the following metadata: names, shas, versions, file names, and tags
  • uploaded_time_start: Inclusive time of upload
  • uploaded_time_end: Inclusive time of upload
Returns

ModelVersionList

def search_my_model_versions( self, search_term: Optional[str] = None, uploaded_time_start: Optional[wallaroo.client.Datetime] = None, uploaded_time_end: Optional[wallaroo.client.Datetime] = None) -> wallaroo.model_version.ModelVersionList:

Search model versions owned by you. Example:

client.search_my_model_versions(search_term="my_model")

Parameters
  • search_term: Searches the following metadata: names, shas, versions, file names, and tags
  • uploaded_time_start: Inclusive time of upload
  • uploaded_time_end: Inclusive time of upload
Returns

ModelVersionList

def search_models( self, search_term: Optional[str] = None, uploaded_time_start: Optional[wallaroo.client.Datetime] = None, uploaded_time_end: Optional[wallaroo.client.Datetime] = None) -> wallaroo.model_version.ModelVersionList:

Search all models you have access to.

Parameters
  • search_term: Searches the following metadata: names, shas, versions, file names, and tags
  • uploaded_time_start: Inclusive time of upload
  • uploaded_time_end: Inclusive time of upload
Returns

ModelVersionList

def search_model_versions( self, search_term: Optional[str] = None, uploaded_time_start: Optional[wallaroo.client.Datetime] = None, uploaded_time_end: Optional[wallaroo.client.Datetime] = None) -> wallaroo.model_version.ModelVersionList:

Search all model versions you have access to. Example:

client.search_model_versions(search_term="my_model")

Parameters
  • search_term: Searches the following metadata: names, shas, versions, file names, and tags
  • uploaded_time_start: Inclusive time of upload
  • uploaded_time_end: Inclusive time of upload
Returns

ModelVersionList

def get_user_by_email(self, email: str) -> Optional[wallaroo.user.User]:

Find a user by email

def deactivate_user(self, email: str) -> None:

Deactivates an existing user of the platform

Deactivated users cannot log into the platform. Deactivated users do not count towards the number of allotted user seats from the license.

The Models and Pipelines owned by the deactivated user are not removed from the platform.

Parameters
  • str email: The email address of the user to deactivate.
Returns

None

def activate_user(self, email: str) -> None:

Activates an existing user of the platform that had been previously deactivated.

Activated users can log into the platform.

Parameters
  • str email: The email address of the user to activate.
Returns

None

def list_users(self) -> List[wallaroo.user.User]:

List of all Users on the platform

Returns

A list of all Users on the platform.

def upload_model( self, name: str, path: Union[str, pathlib.Path], framework: Optional[wallaroo.framework.Framework] = None, input_schema: Optional[pyarrow.lib.Schema] = None, output_schema: Optional[pyarrow.lib.Schema] = None, convert_wait: Optional[bool] = True, arch: Optional[wallaroo.engine_config.Architecture] = None) -> wallaroo.model_version.ModelVersion:

Upload a model defined by a file as a new model variant.

Parameters
  • name: str The name of the model of which this is a variant. Names must be ASCII alpha-numeric characters or dash (-) only.
  • path: Union[str, pathlib.Path] Path of the model file to upload.
  • framework: Optional[Framework] Supported model frameworks. Use models from Framework Enum. Example: Framework.PYTORCH, Framework.TENSORFLOW
  • input_schema: Optional pa.Schema Input schema, required for flavors other than ONNX, Tensorflow, and Python
  • output_schema: Optional pa.Schema Output schema, required for flavors other than ONNX, Tensorflow, and Python
  • convert_wait: Optional bool Defaults to True. Specifies if method should return when conversion is over or not.
Returns

The created Model.

def register_model_image(self, name: str, image: str) -> wallaroo.model_version.ModelVersion:

Registers an MLFlow model as a new model.

Parameters
  • str model_name: The name of the model of which this is a variant. Names must be ASCII alpha-numeric characters or dash (-) only.
  • str image: Image name of the MLFlow model to register.
Returns

The created Model.

def model_by_name( self, model_class: str, model_name: str) -> wallaroo.model_version.ModelVersion:

Fetch a Model by name.

Parameters
  • str model_class: Name of the model class.
  • str model_name: Name of the variant within the specified model class.
Returns

The Model with the corresponding model and variant name.

def model_version_by_name( self, model_class: str, model_name: str) -> wallaroo.model_version.ModelVersion:

Fetch a Model version by name.

Parameters
  • str model_class: Name of the model class.
  • str model_name: Name of the variant within the specified model class.
Returns

The Model with the corresponding model and variant name.

def deployment_by_name(self, deployment_name: str) -> wallaroo.deployment.Deployment:

Fetch a Deployment by name.

Parameters
  • str deployment_name: Name of the deployment.
Returns

The Deployment with the corresponding name.

def pipelines_by_name(self, pipeline_name: str) -> List[wallaroo.pipeline.Pipeline]:

Fetch Pipelines by name.

Parameters
  • str pipeline_name: Name of the pipeline.
Returns

The Pipeline with the corresponding name.

def list_pipelines(self) -> List[wallaroo.pipeline.Pipeline]:

List all pipelines on the platform.

Returns

A list of all pipelines on the platform.

def build_pipeline(self, pipeline_name: str) -> wallaroo.pipeline.Pipeline:

Starts building a pipeline with the given pipeline_name, returning a :py:PipelineConfigBuilder:

When completed, the pipeline can be uploaded with .upload()

Parameters
  • pipeline_name string: Name of the pipeline, must be composed of ASCII alpha-numeric characters plus dash (-).
def create_value_split_experiment( self, name: str, meta_key: str, default_model: wallaroo.model_config.ModelConfig, challenger_models: List[Tuple[Any, wallaroo.model_config.ModelConfig]]) -> wallaroo.pipeline.Pipeline:

Creates a new PipelineVariant of a "value-split experiment" type.

Parameters
  • str name: Name of the Pipeline
  • meta_key str: Inference input key on which to redirect inputs to experiment models.
  • default_model ModelConfig: Model to send inferences by default.
  • challenger_models List[Tuple[Any, ModelConfig]]: A list of meta_key values -> Models to send inferences. If the inference data referred to by meta_key is equal to one of the keys in this tuple, that inference is redirected to the corresponding model instead of the default model.
@staticmethod
def cleanup_arrow_data_for_display(arrow_data: pyarrow.lib.Table) -> pyarrow.lib.Table:

Cleans up the inference result and log data from engine / plateau for display (ux) purposes.

def get_logs( self, topic: str, limit: Optional[int] = None, start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, directory: Optional[str] = None, file_prefix: Optional[str] = None, data_size_limit: Optional[str] = None, arrow: Optional[bool] = False) -> Tuple[Union[pyarrow.lib.Table, pandas.core.frame.DataFrame, wallaroo.logs.LogEntries, NoneType], Optional[str]]:

Get logs for the given topic.

Parameters
  • topic: str The topic to get logs for.
  • limit: Optional[int] The maximum number of logs to return.
  • start_datetime: Optional[datetime] The start time to get logs for.
  • end_datetime: Optional[datetime] The end time to get logs for. :param dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
  • directory: Optional[str] If set, logs will be exported to a file in the given directory.
  • file_prefix: Optional[str] Prefix to name the exported file. Required if directory is set.
  • data_size_limit: Optional[str] The maximum size of the exported data in MB. Size includes all files within the provided directory. By default, the data_size_limit will be set to 100MB.
  • arrow: Optional[bool] If set to True, return logs as an Arrow Table. Else, returns Pandas DataFrame.
Returns

Tuple[Union[pa.Table, pd.DataFrame, LogEntries], str] The logs and status.

def security_logs(self, limit: int) -> List[dict]:

This function is not available in this release

def get_raw_logs( self, topic: str, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, limit: int = 100000, parse: bool = False, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, verbose: bool = False) -> Union[List[Dict[str, Any]], pandas.core.frame.DataFrame]:

Gets logs from Plateau for a particular time window without attempting to convert them to Inference LogEntries. Logs can be returned as strings or the json parsed into lists and dicts.

Parameters
  • topic str: The name of the topic to query
  • start Optional[datetime]: The start of the time window
  • end Optional[datetime]: The end of the time window
  • limit int: The number of records to retrieve. Note retrieving many records may be a performance bottleneck.
  • parse bool: Wether to attempt to parse the string as a json object.
  • verbose bool: Prints out info to help diagnose issues.
def get_raw_pipeline_inference_logs( self, topic: str, start: datetime.datetime, end: datetime.datetime, model_name: Optional[str] = None, limit: int = 100000, verbose: bool = False) -> List[Union[Dict[str, Any], pandas.core.frame.DataFrame]]:

Gets logs from Plateau for a particular time window and filters them for the model specified.

Parameters
  • pipeline_name str: The name/pipeline_id of the pipeline to query
  • topic str: The name of the topic to query
  • start Optional[datetime]: The start of the time window
  • end Optional[datetime]: The end of the time window
  • model_id: The name of the specific model to filter if any
  • limit int: The number of records to retrieve. Note retrieving many records may be a performance bottleneck.
  • verbose bool: Prints out info to help diagnose issues.
def get_pipeline_inference_dataframe( self, topic: str, start: datetime.datetime, end: datetime.datetime, model_name: Optional[str] = None, limit: int = 100000, verbose=False) -> pandas.core.frame.DataFrame:
def get_assay_results( self, assay_id: int, start: datetime.datetime, end: datetime.datetime) -> wallaroo.assay.AssayAnalysisList:

Gets the assay results for a particular time window, parses them, and returns an List of AssayAnalysis.

Parameters
  • assay_id: int The id of the assay we are looking for.
  • start: datetime The start of the time window. If timezone info not set, uses UTC timezone by default.
  • end: datetime The end of the time window. If timezone info not set, uses UTC timezone by default.
def build_assay( self, assay_name: str, pipeline: wallaroo.pipeline.Pipeline, model_name: str, iopath: str, baseline_start: Optional[datetime.datetime] = None, baseline_end: Optional[datetime.datetime] = None, baseline_data: Optional[numpy.ndarray] = None) -> wallaroo.assay_config.AssayBuilder:

Creates an AssayBuilder that can be used to configure and create Assays.

Parameters
  • assay_name: str Human friendly name for the assay
  • pipeline: Pipeline The pipeline this assay will work on
  • model_name: str The model that this assay will monitor
  • iopath: str The path to the input or output of the model that this assay will monitor.
  • baseline_start: Optional[datetime] The start time for the inferences to use as the baseline
  • baseline_end: Optional[datetime] The end time of the baseline window. the baseline. Windows start immediately after the baseline window and are run at regular intervals continuously until the assay is deactivated or deleted.
  • baseline_data: Optional[np.ndarray] Use this to load existing baseline data at assay creation time.
def upload_assay(self, config: wallaroo.assay_config.AssayConfig) -> int:

Creates an assay in the database.

Parameters
  • config AssayConfig: The configuration for the assay to create.
Returns

The identifier for the assay that was created. :rtype int

def list_assays(self) -> List[wallaroo.assay.Assay]:

List all assays on the platform.

Returns

A list of all assays on the platform.

def create_tag(self, tag_text: str) -> wallaroo.tag.Tag:

Create a new tag with the given text.

def create_workspace(self, workspace_name: str) -> wallaroo.workspace.Workspace:

Create a new workspace with the current user as its first owner.

Parameters
  • str workspace_name: Name of the workspace, must be composed of ASCII alpha-numeric characters plus dash (-)
def list_workspaces(self) -> List[wallaroo.workspace.Workspace]:

List all workspaces on the platform which this user has permission see.

Returns

A list of all workspaces on the platform.

def set_current_workspace( self, workspace: wallaroo.workspace.Workspace) -> wallaroo.workspace.Workspace:

Any calls involving pipelines or models will use the given workspace from then on.

def get_current_workspace(self) -> wallaroo.workspace.Workspace:

Return the current workspace. See also set_current_workspace.

def invite_user(self, email, password=None):
def get_topic_name(self, pipeline_pk_id: int) -> str:
def shim_token(self, token_data: wallaroo.auth.TokenData):
def list_orchestrations(self):

List all Orchestrations in the current workspace.

Returns

A List containing all Orchestrations in the current workspace.

def upload_orchestration( self, bytes_buffer: Optional[bytes] = None, path: Optional[str] = None, name: Optional[str] = None, file_name: Optional[str] = None):

Upload a file to be packaged and used as an Orchestration.

The uploaded artifact must be a ZIP file which contains:

  • User code. If main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint.
  • Optional: A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be mentioned. Multiple requirements.txt files are not allowed.
  • Optional: Any other artifacts desired for runtime, including data or code.
Parameters
  • Optional[str] path: The path to the file on your filesystem that will be uploaded as an Orchestration.
  • Optional[bytes] bytes_buffer: The raw bytes to upload to be used Orchestration. Cannot be used with the path param.
  • Optional[str] name: An optional descriptive name for this Orchestration.
  • Optional[str] file_name: An optional filename to describe your Orchestration when using the bytes_buffer param. Ignored when path is used.
Returns

The Orchestration that was uploaded. :raises OrchestrationUploadFailed If a server-side error prevented the upload from succeeding.

def list_tasks(self, killed: bool = False):

List all Tasks in the current Workspace.

Returns

A List containing Task objects.

def get_task_by_id(self, task_id: str):

Retrieve a Task by its ID.

Parameters
  • str task_id: The ID of the Task to retrieve.
Returns

A Task object.

def in_task(self) -> bool:

Determines if this code is inside an orchestration task.

Returns

True if running in a task.

def task_args(self) -> Dict[Any, Any]:

When running inside a task (see in_task()), obtain arguments passed to the task.

Returns

Dict of the arguments

def list_connections(self) -> wallaroo.connection.ConnectionList:

List all Connections defined in the platform.

Returns

List of Connections in the whole platform.

def get_connection(self, name=<class 'str'>) -> wallaroo.connection.Connection:

Retrieves a Connection by its name.

Returns

Connection to an external data source.

def create_connection( self, name=<class 'str'>, connection_type=<class 'str'>, details=typing.Dict[str, typing.Any]) -> wallaroo.connection.Connection:

Creates a Connection with the given name, type, and type-specific details.

Returns

Connection to an external data source.

def create_model_registry( self, name: str, token: str, url: str, workspace_id: Optional[int] = None) -> wallaroo.model_registry.ModelRegistry:

Create a Model Registry connection in this workspace that can be reused across workspaces.

Parameters
  • name str A descriptive name for this registry
  • token str A Bearer token necessary for accessing this Registry.
  • url str The root URL for this registry. It should NOT include /api/2.0/mlflow as part of it.
  • workspace_id int The ID of the workspace to attach this registry to, i.e. client.get_current_workspace().id().
Returns

A ModelRegistry object.

def list_model_registries(self, workspace_id: Optional[int] = None):
def get_email_by_id(self, id: str):
def remove_edge(self, name: str):

Remove an edge to a published pipeline.

Parameters
  • str name: The name of the edge that will be removed. This is not limited to this pipeline.
def mlops(self) -> wallaroo.wallaroo_ml_ops_api_client.client.Client:

6 - wallaroo.comment

class Comment(wallaroo.object.Object):

Comment that may be attached to models and pipelines.

Comment( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def user_id(*args, **kwargs):
def message(*args, **kwargs):
def updated_at(*args, **kwargs):
def list_model_versions(self) -> List[wallaroo.model_version.ModelVersion]:

Lists the models this comment is on.

def list_pipelines(self) -> List[wallaroo.pipeline.Pipeline]:

Lists the models this comment is on.

def add_to_model(self, model_pk_id: int):
def remove_from_model(self, model_id: int):
def add_to_pipeline(self, pipeline_id: int):
def remove_from_pipeline(self, pipeline_pk_id: int):

7 - wallaroo.connection

class Connection(wallaroo.object.Object):

Connection to an external data source or destination.

Connection( client: wallaroo.client.Client, data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
@staticmethod
def list_connections( client: wallaroo.client.Client, workspace_id: Optional[int] = None) -> wallaroo.connection.ConnectionList:
@staticmethod
def get_connection( client: wallaroo.client.Client, name: str) -> wallaroo.connection.Connection:
@staticmethod
def create_connection( client: wallaroo.client.Client, name: str, connection_type: str, details: Dict[str, Any]) -> wallaroo.connection.Connection:
def delete_connection(self):
def add_connection_to_workspace(self, workspace_id: int):
def remove_connection_from_workspace(self, workspace_id: int):
def id(self):
def name(*args, **kwargs):
def connection_type(*args, **kwargs):
def details(*args, **kwargs):
def created_at(*args, **kwargs):
def workspace_names(*args, **kwargs):
class ConnectionList(typing.List[wallaroo.connection.Connection]):

Wraps a list of connections for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

8 - wallaroo.datasizeunit

class DataSizeUnit(enum.Enum):

Data size limits for exported pipeline log files

KiB = <DataSizeUnit.KiB: 'KiB'>
MiB = <DataSizeUnit.MiB: 'MiB'>
GiB = <DataSizeUnit.GiB: 'GiB'>
TiB = <DataSizeUnit.TiB: 'TiB'>
@staticmethod
def from_string(unit_str):
def calculate_bytes(self, size):
Inherited Members
enum.Enum
name
value

9 - wallaroo.deployment

class WaitForError(builtins.Exception):

Common base class for all non-exit exceptions.

WaitForError(message: str, status: Optional[Dict[str, Any]])
Inherited Members
builtins.BaseException
with_traceback
args
class WaitForDeployError(builtins.RuntimeError):

Unspecified run-time error.

WaitForDeployError(message: str)
Inherited Members
builtins.BaseException
with_traceback
args
def hack_pandas_dataframe_order(df):
class Deployment(wallaroo.object.Object):

Base class for all backend GraphQL API objects.

This class serves as a framework for API objects to be constructed based on a partially-complete JSON response, and to fill in their remaining members dynamically if needed.

Deployment(client: Optional[wallaroo.client.Client], data: Dict[str, Any])

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def name(*args, **kwargs):
def deployed(*args, **kwargs):
def model_configs(*args, **kwargs):
def pipeline_versions(*args, **kwargs):
def pipeline_name(*args, **kwargs):
def engine_config(*args, **kwargs):
def deploy(self) -> wallaroo.deployment.Deployment:

Deploys this deployment, if it is not already deployed.

If the deployment is already deployed, this is a no-op.

def undeploy(self) -> wallaroo.deployment.Deployment:

Shuts down this deployment, if it is deployed.

If the deployment is already undeployed, this is a no-op.

def status(self) -> Dict[str, Any]:

Returns a dict of deployment status useful for determining if a deployment has succeeded.

Returns

Dict of deployment internal state information.

def check_limit_status(self):
def wait_for_running(self, timeout: Optional[int] = None) -> wallaroo.deployment.Deployment:

Waits for the deployment status to enter the "Running" state.

Will wait up "timeout_request" seconds for the deployment to enter that state. This is set in the "Client" object constructor. Will raise various exceptions on failures.

Returns

The deployment, for chaining.

def wait_for_undeployed(self) -> wallaroo.deployment.Deployment:

Waits for the deployment to end.

Will wait up "timeout_request" seconds for the deployment to enter that state. This is set in the "Client" object constructor. Will raise various exceptions on failures.

Returns

The deployment, for chaining.

def infer( self, tensor: Union[Dict[str, Any], List[Any], pandas.core.frame.DataFrame, pyarrow.lib.Table], timeout: Union[int, float, NoneType] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None):

Returns an inference result on this deployment, given a tensor.

Parameters
  • tensor: Union[Dict[str, Any], List[Any], pd.DataFrame, pa.Table]. The tensor to be sent to run inference on.
  • timeout: Optional[Union[int, float]] infer requests will time out after the amount of seconds provided are exceeded. timeout defaults to 15 secs.
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[str] If set to ".", returned dataset will be flattened.
Returns

InferenceResult in dictionary, dataframe or arrow format.

def infer_from_file( self, filename: Union[str, pathlib.Path], data_format: Optional[str] = None, timeout: Union[int, float, NoneType] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None) -> Union[List[wallaroo.inference_result.InferenceResult], pandas.core.frame.DataFrame, pyarrow.lib.Table]:

This method is used to run inference on a deployment using a file. The file can be in one of the following formats: pandas.DataFrame: .arrow, .json which contains data either in the pandas.records format or wallaroo custom json format.

Parameters
  • filename: Union[str, pathlib.Path]. The file to be sent to run inference on.
  • data_format: Optional[str]. The format of the data in the file. If not provided, the format will be inferred from the file extension.
  • timeout: Optional[Union[int, float]] infer requests will time out after the amount of seconds provided are exceeded. timeout defaults to 15 secs.
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[str] If set to ".", returned dataset will be flattened.
Returns

InferenceResult in dictionary, dataframe or arrow format.

async def async_infer( self, tensor: Union[Dict[str, Any], List[Any], pandas.core.frame.DataFrame, pyarrow.lib.Table], async_client: httpx.AsyncClient, timeout: Union[int, float, NoneType] = None, retries: Optional[int] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None):
def replace_model( self, model_version: wallaroo.model_version.ModelVersion) -> wallaroo.deployment.Deployment:

Replaces the current model with a default-configured Model.

Parameters
  • ModelVersion model_version: Model variant to replace current model with
def replace_configured_model( self, model_config: wallaroo.model_config.ModelConfig) -> wallaroo.deployment.Deployment:

Replaces the current model with a configured variant.

Parameters
  • ModelConfig model_config: Configured model to replace current model with
def internal_url(self) -> str:

Returns the internal inference URL that is only reachable from inside of the Wallaroo cluster by SDK instances deployed in the cluster.

If both pipelines and models are configured on the Deployment, this gives preference to pipelines. The returned URL is always for the first configured pipeline or model.

def url(self) -> str:

Returns the inference URL.

If both pipelines and models are configured on the Deployment, this gives preference to pipelines. The returned URL is always for the first configured pipeline or model.

def logs( self, limit: int = 100, valid: Optional[bool] = None) -> wallaroo.logs.LogEntries:

Deployment.logs() has been removed. Please use pipeline.logs() instead.

10 - wallaroo.deployment_config

class DeploymentConfig(typing.Dict):
def guarantee_workspace_id( self, workspace_id: Optional[int]) -> wallaroo.deployment_config.DeploymentConfig:
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy
class DeploymentConfigBuilder:
DeploymentConfigBuilder(workspace_id: Optional[int] = None)
def replica_count(self, count: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def replica_autoscale_min_max(self, maximum: int, minimum: int = 0):

Configures the minimum and maximum for autoscaling

def autoscale_cpu_utilization(self, cpu_utilization_percentage: int):

Sets the average CPU metric to scale on in a percentage

def disable_autoscale(self):

Disables autoscaling in the deployment configuration

def cpus( self, core_count: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def deployment_label(self, label: str) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def gpus( self, gpu_count: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def memory( self, memory_spec: str) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def lb_cpus( self, core_count: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def lb_memory( self, memory_spec: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def python_load_timeout_secs( self, timeout_secs: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:
def sidekick_gpus( self, model_version: wallaroo.model_version.ModelVersion, gpu_count: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:

Sets the number of GPUs to be used for the model's sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment.

Parameters
  • ModelVersion model_version: The sidekick model to configure.
  • int core_count: Number of GPUs to use in this sidekick.
Returns

This DeploymentConfigBuilder instance for chaining.

def sidekick_cpus( self, model_version: wallaroo.model_version.ModelVersion, core_count: int) -> wallaroo.deployment_config.DeploymentConfigBuilder:

Sets the number of CPUs to be used for the model's sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment.

Parameters
  • ModelVersion model_version: The sidekick model to configure.
  • int core_count: Number of CPU cores to use in this sidekick.
Returns

This DeploymentConfigBuilder instance for chaining.

def sidekick_memory( self, model_version: wallaroo.model_version.ModelVersion, memory_spec: str) -> wallaroo.deployment_config.DeploymentConfigBuilder:

Sets the memory to be used for the model's sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment.

Parameters
  • ModelVersion model_version: The sidekick model to configure.
  • str memory_spec: Specification of amount of memory (e.g., "2Gi", "500Mi") to use in this sidekick.
Returns

This DeploymentConfigBuilder instance for chaining.

def sidekick_env( self, model_version: wallaroo.model_version.ModelVersion, environment: Dict[str, str]) -> wallaroo.deployment_config.DeploymentConfigBuilder:

Sets the environment variables to be set for the model's sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment.

Parameters
  • ModelVersion model_version: The sidekick model to configure.
  • Dict[str, str] environment: Dictionary of environment variables names and their corresponding values to be set in the sidekick container.
Returns

This DeploymentConfigBuilder instance for chaining.

Sets the machine architecture for the model's sidekick container. Only affects image-based models (e.g. MLFlow models) in a deployment.

Parameters
  • ModelVersion model_version: The sidekick model to configure.
  • Architecture architecture: Machine architecture for this sidekick.
Returns

This DeploymentConfigBuilder instance for chaining.

11 - wallaroo.engine_config

class Architecture(builtins.str, enum.Enum):

An Enum to represent the supported processor architecture.

X86 = <Architecture.X86: 'x86'>
ARM = <Architecture.ARM: 'arm'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class EngineConfig:

Wraps an engine config.

EngineConfig( cpus: int, gpus: Optional[int] = 0, inference_channel_size: Optional[int] = None, model_concurrency: Optional[int] = None, pipeline_config_directory: Optional[str] = None, model_config_directory: Optional[str] = None, model_directory: Optional[str] = None, audit_logging: bool = False, standalone: bool = False, arch: wallaroo.engine_config.Architecture = <Architecture.X86: 'x86'>)
def to_json(self) -> str:

Returns a json representation of this object

def to_yaml(self) -> str:

Returns a yaml representation of this object for use with standalone mode

12 - wallaroo.expression

13 - wallaroo.framework

class Framework(builtins.str, enum.Enum):

An Enum to represent the supported frameworks.

ONNX = <Framework.ONNX: 'onnx'>
TENSORFLOW = <Framework.TENSORFLOW: 'tensorflow'>
PYTHON = <Framework.PYTHON: 'python'>
KERAS = <Framework.KERAS: 'keras'>
SKLEARN = <Framework.SKLEARN: 'sklearn'>
PYTORCH = <Framework.PYTORCH: 'pytorch'>
XGBOOST = <Framework.XGBOOST: 'xgboost'>
HUGGING_FACE_AUTOMATIC_SPEECH_RECOGNITION = <Framework.HUGGING_FACE_AUTOMATIC_SPEECH_RECOGNITION: 'hugging-face-automatic-speech-recognition'>
HUGGING_FACE_FEATURE_EXTRACTION = <Framework.HUGGING_FACE_FEATURE_EXTRACTION: 'hugging-face-feature-extraction'>
HUGGING_FACE_IMAGE_CLASSIFICATION = <Framework.HUGGING_FACE_IMAGE_CLASSIFICATION: 'hugging-face-image-classification'>
HUGGING_FACE_IMAGE_SEGMENTATION = <Framework.HUGGING_FACE_IMAGE_SEGMENTATION: 'hugging-face-image-segmentation'>
HUGGING_FACE_IMAGE_TO_TEXT = <Framework.HUGGING_FACE_IMAGE_TO_TEXT: 'hugging-face-image-to-text'>
HUGGING_FACE_OBJECT_DETECTION = <Framework.HUGGING_FACE_OBJECT_DETECTION: 'hugging-face-object-detection'>
HUGGING_FACE_QUESTION_ANSWERING = <Framework.HUGGING_FACE_QUESTION_ANSWERING: 'hugging-face-question-answering'>
HUGGING_FACE_STABLE_DIFFUSION_TEXT_2_IMG = <Framework.HUGGING_FACE_STABLE_DIFFUSION_TEXT_2_IMG: 'hugging-face-stable-diffusion-text-2-img'>
HUGGING_FACE_SUMMARIZATION = <Framework.HUGGING_FACE_SUMMARIZATION: 'hugging-face-summarization'>
HUGGING_FACE_TEXT_CLASSIFICATION = <Framework.HUGGING_FACE_TEXT_CLASSIFICATION: 'hugging-face-text-classification'>
HUGGING_FACE_TRANSLATION = <Framework.HUGGING_FACE_TRANSLATION: 'hugging-face-translation'>
HUGGING_FACE_ZERO_SHOT_CLASSIFICATION = <Framework.HUGGING_FACE_ZERO_SHOT_CLASSIFICATION: 'hugging-face-zero-shot-classification'>
HUGGING_FACE_ZERO_SHOT_IMAGE_CLASSIFICATION = <Framework.HUGGING_FACE_ZERO_SHOT_IMAGE_CLASSIFICATION: 'hugging-face-zero-shot-image-classification'>
HUGGING_FACE_ZERO_SHOT_OBJECT_DETECTION = <Framework.HUGGING_FACE_ZERO_SHOT_OBJECT_DETECTION: 'hugging-face-zero-shot-object-detection'>
HUGGING_FACE_SENTIMENT_ANALYSIS = <Framework.HUGGING_FACE_SENTIMENT_ANALYSIS: 'hugging-face-sentiment-analysis'>
HUGGING_FACE_TEXT_GENERATION = <Framework.HUGGING_FACE_TEXT_GENERATION: 'hugging-face-text-generation'>
CUSTOM = <Framework.CUSTOM: 'custom'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans

14 - wallaroo.functions

15 - wallaroo.inference_decode

def convert_to_np_dtype(dtype):
def to_nd_array_list(outputs: List[Dict[str, Any]]) -> List[numpy.ndarray]:
def decode_inference_result(entry: Dict[str, Any]) -> List[Dict[str, Any]]:

Decode inference results. Since they have a potentially rich structure, this could become a substantial effort in the future.

TODO: Support multiple outputs TODO: Support multiple data types

def flatten_tensor(prefix: str, numeric_list: list) -> Dict[str, numbers.Number]:

Converts a possibly multidimentionsl list of numbers into a dict where each item in the list is represented by a key value pair in the dict. Does not maintain dimensions since dataframes are 2d. Does not maintain/manage types since it should work for any type supported by numpy.

For example [1,2,3] => {prefix_0: 1, prefix_1: 2, prefix_2: 3}. [[1,2],[3,4]] => {prefix_0_0: 1, prefix_0_1: 2, prefix_1_0: 3, prefix_1_1: 4}

def flatten_dict(prefix: str, input_dict: Dict) -> Dict[str, Any]:

Recursively flattens the input dict, setting the values on the output dict. Assumes simple value types (str, numbers, dicts, and lists). If a value is a dict it is flattened recursively. If a value is a list each item is set as a new k, v pair.

def inference_logs_to_dataframe(logs: List[Dict[str, Any]]) -> pandas.core.frame.DataFrame:

Very similar to dict_list_to_dataframe but specific to inference logs since they have input and output heiararchical fields/structures that must be treated in particular ways.

def nested_df_to_flattened_df(orig: pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame:
def dict_list_to_dataframe(assay_results: List[Dict[str, Any]]) -> pandas.core.frame.DataFrame:

Primarily for assay result lists but can be used for any list of simple dicts.

16 - wallaroo.inference_result

class InferenceResult:
InferenceResult(gql_client: Optional[gql.client.Client], data: Dict[str, Any])

Initializes an InferenceResult.

Parameters
  • gql.Client gql_client: GQL client that this can pass to created objects.
  • Dict[str, Any] data: Response parsed from JSON inference result body.
def data(self) -> List[numpy.ndarray]:

Returns the inference result data.

def model_version(self) -> Tuple[str, str]:

Returns the model this inference was generated by.

def time_elapsed(self) -> datetime.timedelta:

Returns the total time taken by the engine.

def timestamp(self) -> datetime.datetime:

Returns the time at which this inference occurred.

def input_data(self) -> Dict[str, Any]:

Returns the input data for this inference result.

def shadow_data(self) -> Optional[Dict[str, numpy.ndarray]]:

17 - wallaroo.logs

def fetch_plateau_logs(server: str, topic: str, limit: int = 100):
class LogEntry:

Wraps a single log entry.

This class is highly experimental, is unsupported/untested, and may change/disappear in the near future.

LogEntry(entry: Dict[str, Any])
class LogEntries(typing.List[wallaroo.logs.LogEntry]):

Wraps a list of log entries.

This class is highly experimental, is unsupported/untested, and may change/disappear in the near future.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort
class LogEntriesShadowDeploy(typing.List[wallaroo.logs.LogEntry]):

Wraps a list of log entries.

This class is highly experimental, is unsupported/untested, and may change/disappear in the near future.

LogEntriesShadowDeploy(logs: wallaroo.logs.LogEntries)
Inherited Members
builtins.list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

18 - wallaroo.model

A Wallaroo Model object. Each Model may have multiple model versions, accessed via .versions()

Model( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def name(*args, **kwargs):
def owner_id(*args, **kwargs):
def last_update_time(*args, **kwargs):
def created_at(*args, **kwargs):
def versions(*args, **kwargs):
class ModelList(typing.List[wallaroo.model.Model]):

Wraps a list of Models for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

19 - wallaroo.model_config

class ModelConfig(wallaroo.object.Object):

Wraps a backend ModelConfig object.

ModelConfig( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
inputs
outputs
def to_yaml(self):

Generates a yaml file for standalone engines

def to_k8s_yaml(self):
def id(self) -> int:
def filter_threshold(*args, **kwargs):
def model_version(*args, **kwargs):
def runtime(*args, **kwargs):
def tensor_fields(*args, **kwargs):

20 - wallaroo.model_registry

class ModelRegistry(wallaroo.object.Object):

An instance of an external Model Registry. Currently, the supported registries are: DataBricks

ModelRegistry(client: wallaroo.client.Client, data: Dict[str, Any])

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def name(*args, **kwargs):
def id(*args, **kwargs):
def url(*args, **kwargs):
def created_at(*args, **kwargs):
def updated_at(*args, **kwargs):
def workspaces(*args, **kwargs):
def add_registry_to_workspace(self, workspace_id: int):
def remove_registry_from_workspace(self, workspace_id: int):
def list_models(self):
def upload_model( self, name: str, path: str, framework: wallaroo.framework.Framework, input_schema: pyarrow.lib.Schema, output_schema: pyarrow.lib.Schema, requirements: List[str] = []) -> wallaroo.model_version.ModelVersion:
Parameters
  • name str A descriptive name to set
  • path str The DBFS path to the artifact file to upload.
  • framework Framework The ML framework that this model uses (ex. Framework.ONNX).
  • requirements List[str] An optional list of python requirements needed to run this model.
  • input_schema pa.Schema A PyArrow schema describing inputs to this model.
  • output_schema pa.Schema A PyArrow schema describing the outputs of this model.
Returns

Model An instance of the Wallaroo model that is being created.

class ModelRegistriesList(typing.List[wallaroo.model_registry.ModelRegistry]):

Wraps a list of Model Registries for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort
class RegisteredModelVersion(wallaroo.wallaroo_ml_ops_api_client.models.registered_model_version.RegisteredModelVersion):

An MLFlow version of a [RegisteredModel]. Versions start at 1.

Attributes: creation_timestamp (int): Timestamp in milliseconds from epoch current_stage (str): last_updated_timestamp (int): Timestamp in milliseconds from epoch name (str): run_id (str): source (str): status (str): version (str): description (Union[Unset, None, str]): run_link (Union[Unset, None, str]): user_id (Union[Unset, None, str]):

RegisteredModelVersion( client: wallaroo.client.Client, data: wallaroo.wallaroo_ml_ops_api_client.models.registered_model_version.RegisteredModelVersion, registry_id: str)

Method generated by attrs for class RegisteredModelVersion.

updated_at

def upload(self, framework: Framework, requirements: List[str] = []): # Uploads this Model's source code to your Wallaroo instance.

#:param: framework Framework The ML framework that this model uses (ex. Framework.ONNX).
#:param: requirements List[str] An optional list of python requirements needed to run this model.
#:return: Model An instance of the Wallaroo model that is being created.
#
from .wallaroo_ml_ops_api_client.api.model.upload_from_registry import sync
from .wallaroo_ml_ops_api_client.models.upload_from_registry_request import (
    UploadFromRegistryRequest,
)
from .wallaroo_ml_ops_api_client.models.conversion import Conversion

ret = sync(
    client=self._client.mlops(),
    json_body=UploadFromRegistryRequest(
        Conversion(
            framework=framework._to_openapi_framework(),
            requirements=requirements,
        ),
        self.name,
        self.registry_id,
        self.version,
        "private",
        self._client.get_current_workspace()._id,
    ),
)

if ret is None:
    raise Exception("Failed to start upload.")

return Model(client=self._client, data={"id": ret.model_id})
def list_artifacts(self):
Inherited Members
wallaroo.wallaroo_ml_ops_api_client.models.registered_model_version.RegisteredModelVersion
to_dict
from_dict
additional_keys
class DbfsFileList(typing.List[wallaroo.wallaroo_ml_ops_api_client.models.dbfs_list_response_file_with_full_path.DbfsListResponseFileWithFullPath]):

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort
class RegisteredModelVersionsList(typing.List[wallaroo.model_registry.RegisteredModelVersion]):

Wraps a list of Model Registries for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort
class RegisteredModel:

This is a reference to a Registered Model in an external registry.

RegisteredModel( client: wallaroo.client.Client, data: wallaroo.wallaroo_ml_ops_api_client.models.registered_model.RegisteredModel, registry_id: str)
def versions(self):
def list_artifacts(self):
class RegisteredModels(typing.List[wallaroo.model_registry.RegisteredModel]):

This is a list of Registered Models in an external registry.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

21 - wallaroo.model_status

def model_status_to_string( status: wallaroo.wallaroo_ml_ops_api_client.models.model_status.ModelStatus) -> str:
def is_attempting_load( status: wallaroo.wallaroo_ml_ops_api_client.models.model_status.ModelStatus) -> bool:

22 - wallaroo.model_version

class ModelVersion(wallaroo.object.Object):

The Wallaroo ModelVersion object. Each ModelVersion is a specific version of a Model.

ModelVersion( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def uid(self) -> str:
def name(*args, **kwargs):
def version(*args, **kwargs):
def models_pk_id(*args, **kwargs):
def sha(*args, **kwargs):
def status(*args, **kwargs):
def file_name(*args, **kwargs):
def image_path(*args, **kwargs):
def arch(*args, **kwargs):
def last_update_time(*args, **kwargs):
inputs
outputs
def tags(*args, **kwargs):
def rehydrate_config(*args, **kwargs):
def configure( self, runtime: Optional[str] = None, tensor_fields: Optional[List[str]] = None, filter_threshold: Optional[float] = None, input_schema: Optional[pyarrow.lib.Schema] = None, output_schema: Optional[pyarrow.lib.Schema] = None, batch_config: Optional[str] = None) -> wallaroo.model_version.ModelVersion:
def logs( self, limit: int = 100, valid: Optional[bool] = None, arrow: Optional[bool] = False) -> Tuple[Any, Optional[str]]:
def deploy( self, pipeline_name: str, deployment_config: Optional[wallaroo.deployment_config.DeploymentConfig] = None) -> wallaroo.pipeline.Pipeline:

Convenience function to quickly deploy a Model. It will configure the model, create a pipeline with a single model step, deploy it, and return the pipeline.

Typically, the configure() method is used to configure a model prior to deploying it. However, if a default configuration is sufficient, this function can be used to quickly deploy with said default configuration.

The filename this Model was generated from needs to have a recognizable file extension so that the runtime can be inferred. Currently, this is:

  • .onnx -> ONNX runtime
Parameters
  • str deployment_name: Name of the deployment to create. Must be unique across all deployments. Deployment names must be ASCII alpha-numeric characters plus dash (-) only.
class ModelVersionList(typing.List[wallaroo.model_version.ModelVersion]):

Wraps a list of model versions for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

23 - wallaroo.notify

class Notification:
Notification()
def to_json(self):
class Email(Notification):
Email(to)
def to_json(self):
@classmethod
def from_json(cls, json):
def to_json(notifications):
def from_json(json):
class AlertConfiguration:
AlertConfiguration(name, expression, notifications)
@classmethod
def from_json(Cls, json):
def to_json(self):

24 - wallaroo.object

class DehydratedValue:

Represents a not-set sentinel value.

Attributes that are null in the database will be returned as None in Python, and we want them to be set as such, so None cannot be used as a sentinel value signaling that an optional attribute is not yet set. Objects of this class fill that role instead.

DehydratedValue()
def rehydrate(attr):

Decorator that rehydrates the named attribute if needed.

This should decorate getter calls for an attribute:

@rehydrate(_foo_attr)
def foo_attr(self):
    return self._foo_attr

This will cause the API object to "rehydrate" (perform a query to fetch and fill in all attributes from the database) if the named attribute is not set.

def value_if_present( data: Dict[str, Any], path: str) -> Union[Any, wallaroo.object.DehydratedValue]:

Returns a value in a nested dictionary, or DehydratedValue.

Parameters
  • str path: Dot-delimited path within a nested dictionary; e.g. foo.bar.baz
Returns

The requested value inside the dictionary, or DehydratedValue if it doesn't exist.

class RequiredAttributeMissing(builtins.Exception):

Raised when an API object is initialized without a required attribute.

RequiredAttributeMissing(class_name: str, attribute_name: str)
Inherited Members
builtins.BaseException
with_traceback
args
class ModelUploadError(builtins.Exception):

Raised when a model file fails to upload.

ModelUploadError(e)
Inherited Members
builtins.BaseException
with_traceback
args
class ModelConversionError(builtins.Exception):

Raised when a model file fails to convert.

ModelConversionError(e)
Inherited Members
builtins.BaseException
with_traceback
args
class ModelConversionTimeoutError(builtins.Exception):

Raised when a model conversion took longer than 10mins

ModelConversionTimeoutError(e)
Inherited Members
builtins.BaseException
with_traceback
args
class EntityNotFoundError(builtins.Exception):

Raised when a query for a specific API object returns no results.

This is specifically for queries by unique identifiers that are expected to return exactly one result; queries that can return 0 to many results should return empty list instead of raising this exception.

EntityNotFoundError(entity_type: str, params: Dict[str, str])
Inherited Members
builtins.BaseException
with_traceback
args
class LimitError(builtins.Exception):

Raised when deployment fails.

LimitError(e)
Inherited Members
builtins.BaseException
with_traceback
args
class UserLimitError(builtins.Exception):

Raised when a community instance has hit the user limit

UserLimitError()
Inherited Members
builtins.BaseException
with_traceback
args
class DeploymentError(builtins.Exception):

Raised when deployment fails.

DeploymentError(e)
Inherited Members
builtins.BaseException
with_traceback
args
class InferenceError(builtins.Exception):

Raised when inference fails.

InferenceError(error)
Inherited Members
builtins.BaseException
with_traceback
args
class InvalidNameError(builtins.Exception):

Raised when an entity's name does not meet the expected critieria.

Parameters
  • str name: the name string that is invalid
  • str req: a string description of the requirement
InvalidNameError(name: str, req: str)
Inherited Members
builtins.BaseException
with_traceback
args
class CommunicationError(builtins.Exception):

Raised when some component cannot be contacted. There is a networking, configuration or installation problem.

CommunicationError(e)
Inherited Members
builtins.BaseException
with_traceback
args
class Object(abc.ABC):

Base class for all backend GraphQL API objects.

This class serves as a framework for API objects to be constructed based on a partially-complete JSON response, and to fill in their remaining members dynamically if needed.

Object( gql_client: Optional[gql.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.

25 - wallaroo.orchestration

class Orchestration(wallaroo.object.Object):

An Orchestration object that represents some user-defined code that has been packaged into a container and can be deployed.

Orchestration( client: wallaroo.client.Client, data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
@staticmethod
def list_orchestrations( client: wallaroo.client.Client) -> List[wallaroo.orchestration.Orchestration]:
@staticmethod
def upload( client: wallaroo.client.Client, name: Optional[str] = None, bytes_buffer: Optional[bytes] = None, path: Optional[str] = None, file_name: Optional[str] = None) -> wallaroo.orchestration.Orchestration:
def run_once( self, name: Optional[str], json_args: Dict[Any, Any] = {}, timeout: Optional[int] = None, debug: Optional[bool] = False):

Runs this Orchestration once.

Parameters
  • name str A descriptive identifier for this run.
  • json_args Dict[Any, Any] A JSON object containing deploy-specific arguments.
  • timeout Optional[int] A timeout in seconds. Any instance of this orchestration that is running for longer than this specified time will be automatically killed.
  • debug Optional[bool] Produce extra debugging output about the run
Returns

A metadata object associated with the deploy Task

def run_scheduled( self, name: str, schedule: str, json_args: Dict[Any, Any] = {}, timeout: Optional[int] = None, debug: Optional[bool] = False):

Runs this Orchestration on a cron schedule.

Parameters
  • name str A descriptive identifier for this run.
  • schedule str A cron-style scheduling string, e.g. "* * * * " or "/15 * * * *"
  • json_args Dict[Any, Any] A JSON object containing deploy-specific arguments.
  • timeout Optional[int] A timeout in seconds. Any single instance of this orchestration that is running for longer than this specified time will be automatically killed. Future runs will still be scheduled.
  • debug Optional[bool] Produce extra debugging output about the run
Returns

A metadata object associated with the deploy Task

def id(*args, **kwargs):
def name(*args, **kwargs):
def file_name(*args, **kwargs):
def sha(*args, **kwargs):
def status(*args, **kwargs):
def created_at(*args, **kwargs):
def updated_at(*args, **kwargs):
def workspace_id(*args, **kwargs):
def task(*args, **kwargs):
def list_tasks(self):
class OrchestrationList(typing.List[wallaroo.orchestration.Orchestration]):

Wraps a list of orchestrations for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort
class OrchestrationUploadFailed(builtins.Exception):

Raised when uploading an Orchestration fails due to a backend issue.

OrchestrationUploadFailed(e)
Inherited Members
builtins.BaseException
with_traceback
args
class OrchestrationMissingFile(builtins.Exception):

Raised when uploading an Orchestration without providing a file-like object.

OrchestrationMissingFile()
Inherited Members
builtins.BaseException
with_traceback
args
class OrchestrationDeployOneshotFailed(builtins.Exception):

Raised when deploying an Orchestration fails due to a backend issue.

OrchestrationDeployOneshotFailed(e)
Inherited Members
builtins.BaseException
with_traceback
args

26 - wallaroo.pipeline

def update_timestamp(f):
class Pipeline(wallaroo.object.Object):

A pipeline is an execution context for models. Pipelines contain Steps, which are often Models. Pipelines can be deployed or un-deployed.

Pipeline(client: Optional[wallaroo.client.Client], data: Dict[str, Any])

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def owner_id(*args, **kwargs):
def create_time(*args, **kwargs):
def last_update_time(*args, **kwargs):
def name(*args, **kwargs):
def versions(*args, **kwargs):
def tags(*args, **kwargs):
def get_pipeline_configuration(self, version: Optional[str] = None) -> Dict[str, Any]:

Get a pipeline configuration for a specific version.

Parameters
  • version: str Version of the pipeline. :return Dict[str, Any] Pipeline configuration.
def logs( self, limit: Optional[int] = None, start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, valid: Optional[bool] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, arrow: Optional[bool] = False) -> Union[wallaroo.logs.LogEntries, pyarrow.lib.Table, pandas.core.frame.DataFrame]:

Get inference logs for this pipeline.

Parameters
  • limit: Optional[int]: Maximum number of logs to return.
  • start_datetime: Optional[datetime.datetime]: Start time for logs.
  • end_datetime: Optional[datetime.datetime]: End time for logs.
  • valid: Optional[bool]: If set to False, will include logs for failed inferences
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
  • arrow: Optional[bool] If set to True, return logs as an Arrow Table. Else, returns Pandas DataFrame.
Returns

Union[LogEntries, pa.Table, pd.DataFrame]

def export_logs( self, directory: Optional[str] = None, file_prefix: Optional[str] = None, data_size_limit: Optional[str] = None, limit: Optional[int] = None, start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, valid: Optional[bool] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, arrow: Optional[bool] = False) -> None:

Export logs to a user provided local file.

Parameters
  • directory: Optional[str] Logs will be exported to a file in the given directory. By default, logs will be exported to new "logs" subdirectory in current working directory.
  • file_prefix: Optional[str] Prefix to name the exported file. By default, the file_prefix will be set to the pipeline name.
  • data_size_limit: Optional[str] The maximum size of the exported data in bytes. Size includes all files within the provided directory. By default, the data_size_limit will be set to 100MB.
  • limit: Optional[int] The maximum number of logs to return.
  • start_datetime: Optional[datetime.datetime] The start time to filter logs by.
  • end_datetime: Optional[datetime.datetime] The end time to filter logs by.
  • valid: Optional[bool] If set to False, will return logs for failed inferences.
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
  • arrow: Optional[bool] If set to True, return logs as an Arrow Table. Else, returns Pandas DataFrame. :return None
def logs_shadow_deploy(self):
def url(self) -> str:

Returns the inference URL for this pipeline.

def deploy( self, pipeline_name: Optional[str] = None, deployment_config: Optional[wallaroo.deployment_config.DeploymentConfig] = None) -> wallaroo.pipeline.Pipeline:

Deploy pipeline. pipeline_name is optional if deploy was called previously. When specified, pipeline_name must be ASCII alpha-numeric characters, plus dash (-) only.

def definition(self) -> str:

Get the current definition of the pipeline as a string

def get_topic_name(self) -> str:
def undeploy(self) -> wallaroo.pipeline.Pipeline:
def infer(self, *args, **kwargs):

Returns an inference result on this deployment, given a tensor.

Parameters
  • tensor: Union[Dict[str, Any], pd.DataFrame, pa.Table] Inference data. Should be a dictionary. Future improvement: will be a pandas dataframe or arrow table
  • timeout: Optional[Union[int, float]] infer requests will time out after the amount of seconds provided are exceeded. timeout defaults to 15 secs.
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
Returns

InferenceResult in dictionary, dataframe or arrow format.

def infer_from_file(self, *args, **kwargs):

Returns an inference result on this deployment, given tensors in a file.

async def async_infer( self, tensor: Union[Dict[str, Any], pandas.core.frame.DataFrame, pyarrow.lib.Table], async_client: httpx.AsyncClient, timeout: Union[int, float, NoneType] = None, retries: Optional[int] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None):

Runs an async inference and returns an inference result on this deployment, given a tensor.

Parameters
  • tensor: Union[Dict[str, Any], pd.DataFrame, pa.Table] Inference data.
  • async_client: AsyncClient Async client to use for async inference.
  • timeout: Optional[Union[int, float]] infer requests will time out after the amount of seconds provided are exceeded. timeout defaults to 15 secs.
  • retries: Optional[int] Number of retries to use in case of Connection errors.
  • job_id: Optional[int] Job id to use for async inference.
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
async def parallel_infer( self, tensor_list: List[Union[Dict[str, Any], List[Any], pandas.core.frame.DataFrame, pyarrow.lib.Table]], timeout: Union[int, float, NoneType] = None, num_parallel: Optional[int] = None, retries: Optional[int] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None):

Runs parallel inferences and returns a list of inference results on latest deployment.

Parameters
  • tensor_list: List[Union[Dict[str, Any], List[Any], pd.DataFrame, pa.Table]] List of inference data.
  • timeout: Optional[Union[int, float]] infer requests will time out after the amount of seconds provided are exceeded. timeout defaults to 15 secs.
  • num_parallel: Optional[int] Semaphore to use for async inference.
  • retries: Optional[int] Number of retries to use in case of Connection errors.
  • dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
def status(self) -> Dict[str, Any]:

Status of pipeline

def steps(self) -> List[wallaroo.pipeline_config.Step]:

Returns a list of the steps of a pipeline. Not exactly a shim

def model_configs(self) -> List[wallaroo.model_config.ModelConfig]:

Returns a list of the model configs of a pipeline. Not exactly a shim

def remove_step(self, index: int) -> wallaroo.pipeline.Pipeline:

Remove a step at a given index

def add_model_step( self, model_version: wallaroo.model_version.ModelVersion) -> wallaroo.pipeline.Pipeline:

Perform inference with a single model.

def replace_with_model_step( self, index: int, model_version: wallaroo.model_version.ModelVersion) -> wallaroo.pipeline.Pipeline:

Replaces the step at the given index with a model step

def add_multi_model_step( self, model_version_list: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Perform inference on the same input data for any number of models.

def replace_with_multi_model_step( self, index: int, model_version_list: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Replaces the step at the index with a multi model step

def add_audit(self, slice) -> wallaroo.pipeline.Pipeline:

Run audit logging on a specified slice of model outputs.

The slice must be in python-like format. start:, start:end, and :end are supported.

def replace_with_audit(self, index: int, audit_slice: str) -> wallaroo.pipeline.Pipeline:

Replaces the step at the index with an audit step

def add_select(self, index: int) -> wallaroo.pipeline.Pipeline:

Select only the model output with the given index from an array of outputs.

def replace_with_select(self, step_index: int, select_index: int) -> wallaroo.pipeline.Pipeline:

Replaces the step at the index with a select step

def add_key_split( self, default: wallaroo.model_version.ModelVersion, meta_key: str, options: Dict[str, wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Split traffic based on the value at a given meta_key in the input data, routing to the appropriate model.

If the resulting value is a key in options, the corresponding model is used. Otherwise, the default model is used for inference.

def replace_with_key_split( self, index: int, default: wallaroo.model_version.ModelVersion, meta_key: str, options: Dict[str, wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Replace the step at the index with a key split step

def add_random_split( self, weighted: Iterable[Tuple[float, wallaroo.model_version.ModelVersion]], hash_key: Optional[str] = None) -> wallaroo.pipeline.Pipeline:

Routes inputs to a single model, randomly chosen from the list of weighted options.

Each model receives inputs that are approximately proportional to the weight it is assigned. For example, with two models having weights 1 and 1, each will receive roughly equal amounts of inference inputs. If the weights were changed to 1 and 2, the models would receive roughly 33% and 66% respectively instead.

When choosing the model to use, a random number between 0.0 and 1.0 is generated. The weighted inputs are mapped to that range, and the random input is then used to select the model to use. For example, for the two-models equal-weight case, a random key of 0.4 would route to the first model. 0.6 would route to the second.

To support consistent assignment to a model, a hash_key can be specified. This must be between 0.0 and 1.0. The value at this key, when present in the input data, will be used instead of a random number for model selection.

def replace_with_random_split( self, index: int, weighted: Iterable[Tuple[float, wallaroo.model_version.ModelVersion]], hash_key: Optional[str] = None) -> wallaroo.pipeline.Pipeline:

Replace the step at the index with a random split step

def add_shadow_deploy( self, champion: wallaroo.model_version.ModelVersion, challengers: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Create a "shadow deployment" experiment pipeline. The champion model and all challengers are run for each input. The result data for all models is logged, but the output of the champion is the only result returned.

This is particularly useful for "burn-in" testing a new model with real world data without displacing the currently proven model.

This is currently implemented as three steps: A multi model step, an audit step, and a select step. To remove or replace this step, you need to remove or replace all three. You can remove steps using pipeline.remove_step

def replace_with_shadow_deploy( self, index: int, champion: wallaroo.model_version.ModelVersion, challengers: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline.Pipeline:

Replace a given step with a shadow deployment

def add_validation( self, name: str, validation: wallaroo.checks.Expression) -> wallaroo.pipeline.Pipeline:

Add a validation with the given name. All validations are run on all outputs, and all failures are logged.

def add_alert( self, name: str, alert: wallaroo.checks.Alert, notifications: List[wallaroo.notify.Notification]) -> wallaroo.pipeline.Pipeline:
def replace_with_alert( self, index: int, name: str, alert: wallaroo.checks.Alert, notifications: List[wallaroo.notify.Notification]) -> wallaroo.pipeline.Pipeline:

Replace the step at the given index with the specified alert

def clear(self) -> wallaroo.pipeline.Pipeline:

Remove all steps from the pipeline. This might be desireable if replacing models, for example.

def list_explainability_configs(self) -> List[wallaroo.explainability.ExplainabilityConfig]:

List the explainability configs we've created.

def get_explainability_config( self, expr: Union[str, wallaroo.explainability.ExplainabilityConfig]) -> wallaroo.explainability.ExplainabilityConfig:

Get the details of an explainability config.

def create_explainability_config(self, feature_names: Sequence[str], num_points=10):

Create a shap config to be used later for reference and adhoc requests.

def publish( self, deployment_config: Optional[wallaroo.deployment_config.DeploymentConfig] = None, replaces: Optional[List[int]] = None):

Create a new version of a pipeline and publish it.

def publishes(self):
def list_edges(self):
def create_version(self) -> wallaroo.pipeline_version.PipelineVersion:

Creates a new PipelineVersion and stores it in the database.

class Pipelines(typing.List[wallaroo.pipeline.Pipeline]):

Wraps a list of pipelines for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort
class Edge(wallaroo.wallaroo_ml_ops_api_client.models.edge.Edge):

The Edge

Attributes: cpus (float): Number of CPUs id (str): ID memory (str): Amount of memory (in k8s format) name (str): User-given name tags (List[str]): Edge tags should_run_publish (Union[Unset, None, int]): The pipeline publish ID this edge is supposed to run spiffe_id (Union[Unset, None, str]): Spiffe ID

Inherited Members
wallaroo.wallaroo_ml_ops_api_client.models.edge.Edge
Edge
to_dict
from_dict
additional_keys
class EdgesList(typing.List[wallaroo.pipeline.Edge]):

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

27 - wallaroo.pipeline_config

class ValidDataType(builtins.str, enum.Enum):

An enumeration.

f32 = <ValidDataType.f32: 'f32'>
f64 = <ValidDataType.f64: 'f64'>
i8 = <ValidDataType.i8: 'i8'>
u8 = <ValidDataType.u8: 'u8'>
i16 = <ValidDataType.i16: 'i16'>
u16 = <ValidDataType.u16: 'u16'>
i32 = <ValidDataType.i32: 'i32'>
u32 = <ValidDataType.u32: 'u32'>
i64 = <ValidDataType.i64: 'i64'>
u64 = <ValidDataType.u64: 'u64'>
Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans
class ModelConfigsForStep:
ModelConfigsForStep(model_configs: List[wallaroo.model_config.ModelConfig])
class ModelForStep:
ModelForStep(name, version, sha)
def to_json(self):
@classmethod
def from_json(cls, json_dict: Dict[str, str]):
@classmethod
def from_model(cls, model: wallaroo.model_version.ModelVersion):
class ModelWeight:
ModelWeight(weight: float, model: wallaroo.pipeline_config.ModelForStep)
def to_json(self):
@classmethod
def from_json(cls, json_dict: Dict[str, Any]):
@classmethod
def from_tuple(cls, tup: Tuple[float, wallaroo.model_version.ModelVersion]):
class RowToModel:
RowToModel(row_index: int, model: wallaroo.pipeline_config.ModelForStep)
def to_json(self):
@classmethod
def from_json(cls, json_dict: Dict[str, Any]):
class Step:
Step()
def to_json(self):
def is_inference_step(self):
def shortname(self):

A short name to represent this Step in Jupyter tables.

@staticmethod
def from_json(json_dict: Dict):
class Steps(typing.List[wallaroo.pipeline_config.Step]):

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort
class Average(Step):
Average()
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
class AuditResults(Step):
AuditResults(start: int, end: Optional[int] = None)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def shortname(self):

A short name to represent this Step in Jupyter tables.

Inherited Members
class Check(Step):
Check(tree: str)
def to_json(self):
@classmethod
def from_name_and_validation( cls, name: str, validation: wallaroo.checks.Expression, gauges: List[str] = []):
@staticmethod
def from_json(json_dict: Dict):
class ColumnsSelect(Step):
ColumnsSelect(columns: List[int])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def shortname(self):

A short name to represent this Step in Jupyter tables.

Inherited Members
class ColumnsToRows(Step):
ColumnsToRows()
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
class InputDataToType(Step):
InputDataToType(data_type: wallaroo.pipeline_config.ValidDataType)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def shortname(self):

A short name to represent this Step in Jupyter tables.

Inherited Members
class ModelInference(Step):
ModelInference(models: List[wallaroo.pipeline_config.ModelForStep])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
def shortname(self):

A short name to represent this Step in Jupyter tables.

class RowsToModels(Step):
RowsToModels(rows_to_models: List[wallaroo.pipeline_config.RowToModel])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
Inherited Members
class Nth(Step):
Nth(index: int)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def shortname(self):

A short name to represent this Step in Jupyter tables.

Inherited Members
class MultiOut(Step):
MultiOut()
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
class MetaValueSplit(Step):
MetaValueSplit( split_key: str, control: wallaroo.pipeline_config.ModelForStep, routes: Dict[str, wallaroo.pipeline_config.ModelForStep])
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
def shortname(self):

A short name to represent this Step in Jupyter tables.

class RandomSplit(Step):
RandomSplit( weights: List[wallaroo.pipeline_config.ModelWeight], hash_key: Optional[str] = None)
def to_json(self):
@staticmethod
def from_json(json_dict: Dict):
def is_inference_step(self):
Inherited Members
class PipelineConfig:
PipelineConfig( pipeline_name: str, steps: Iterable[wallaroo.pipeline_config.Step], alert_configurations: Iterable[wallaroo.notify.AlertConfiguration])
@classmethod
def from_json(Klass, json):
def to_json(self):
def to_yaml(self):
class PipelineConfigBuilder:
PipelineConfigBuilder( client: Optional[wallaroo.client.Client], pipeline_name: str, standalone=False)
def upload(self) -> wallaroo.pipeline.Pipeline:
def remove_step(self, index: int):

Remove a step at a given index

Perform inference with a single model.

def replace_with_model_step( self, index: int, model: wallaroo.model_version.ModelVersion) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the given index with a model step

def add_multi_model_step( self, models: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Perform inference on the same input data for any number of models.

def replace_with_multi_model_step( self, index: int, models: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the index with a multi model step

def add_audit(self, audit_slice: str) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Run audit logging on a specified slice of model outputs.

The slice must be in python-like format. start:, start:end, and :end are supported.

def replace_with_audit( self, index: int, audit_slice: str) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the index with an audit step

def add_select(self, index: int) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Select only the model output with the given index from an array of outputs.

def add_multi_out(self):
def replace_with_select( self, step_index: int, select_index: int) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replaces the step at the index with a select step

Split traffic based on the value at a given meta_key in the input data, routing to the appropriate model.

If the resulting value is a key in options, the corresponding model is used. Otherwise, the default model is used for inference.

def replace_with_key_split( self, index: int, default: wallaroo.model_version.ModelVersion, meta_key: str, options: Dict[str, wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replace the step at the index with a key split step

def add_random_split( self, weighted: Iterable[Tuple[float, wallaroo.model_version.ModelVersion]], hash_key: Optional[str] = None) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Routes inputs to a single model, randomly chosen from the list of weighted options.

Each model receives inputs that are approximately proportional to the weight it is assigned. For example, with two models having weights 1 and 1, each will receive roughly equal amounts of inference inputs. If the weights were changed to 1 and 2, the models would receive roughly 33% and 66% respectively instead.

When choosing the model to use, a random number between 0.0 and 1.0 is generated. The weighted inputs are mapped to that range, and the random input is then used to select the model to use. For example, for the two-models equal-weight case, a random key of 0.4 would route to the first model. 0.6 would route to the second.

To support consistent assignment to a model, a hash_key can be specified. This must be between 0.0 and 1.0. The value at this key, when present in the input data, will be used instead of a random number for model selection.

def replace_with_random_split( self, index: int, weighted: Iterable[Tuple[float, wallaroo.model_version.ModelVersion]], hash_key: Optional[str] = None) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replace the step at the index with a random split step

Create a "shadow deployment" experiment pipeline. The champion model and all challengers are run for each input. The result data for all models is logged, but the output of the champion is the only result returned.

This is particularly useful for "burn-in" testing a new model with real world data without displacing the currently proven model.

This is currently implemented as three steps: A multi model step, an audit step, and a select step. To remove or replace this step, you need to remove or replace all three. You can remove steps using pipeline.remove_step

def replace_with_shadow_deploy( self, index: int, champion: wallaroo.model_version.ModelVersion, challengers: Iterable[wallaroo.model_version.ModelVersion]) -> wallaroo.pipeline_config.PipelineConfigBuilder:
def add_validation( self, name: str, validation: wallaroo.checks.Expression) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Add a validation with the given name. All validations are run on all outputs, and all failures are logged.

def replace_with_validation( self, index: int, name: str, validation: wallaroo.checks.Expression) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replace the step at the given index with a validation step

def add_alert( self, name: str, alert: wallaroo.checks.Alert, notifications: List[wallaroo.notify.Notification]) -> wallaroo.pipeline_config.PipelineConfigBuilder:
def replace_with_alert( self, index, name: str, alert: wallaroo.checks.Alert, notifications: List[wallaroo.notify.Notification]) -> wallaroo.pipeline_config.PipelineConfigBuilder:

Replace the step at the given index with the specified alert

Remove all steps from the pipeline. This might be desireable if replacing models, for example.

28 - wallaroo.pipeline_publish

class PipelinePublish(wallaroo.wallaroo_ml_ops_api_client.models.publish_pipeline_response_202.PublishPipelineResponse202):

Attributes: created_at (datetime.datetime): docker_run_variables (PublishPipelineResponse202DockerRunVariables): engine_config (EngineConfig): id (int): pipeline_version_id (int): status (PipelinePublishStatus): updated_at (datetime.datetime): user_images (List[str]): created_by (Union[Unset, None, str]): edge_bundles (Union[Unset, PublishPipelineResponse202EdgeBundles]): engine_url (Union[Unset, None, str]): error (Union[Unset, None, str]): If [PipelinePublish::status] is in the [PipelinePublishStatus::Error] state, this should be populated with the error that occured. helm (Union[Unset, None, PipelinePublishHelm]): pipeline_url (Union[Unset, None, str]): pipeline_version_name (Union[Unset, None, str]):

PipelinePublish(client: wallaroo.client.Client, chart_url=None, **data)

Method generated by attrs for class PublishPipelineResponse202.

def add_edge( self, name: str, tags: List[str] = []) -> wallaroo.pipeline_publish.PipelinePublish:

Add new edge to a published pipeline.

def remove_edge(self, name: str):

Remove an edge to a published pipeline.

Parameters
  • str name: The name of the edge that will be removed. This is not limited to this pipeline.
Inherited Members
wallaroo.wallaroo_ml_ops_api_client.models.publish_pipeline_response_202.PublishPipelineResponse202
to_dict
from_dict
additional_keys
class PipelinePublishList(typing.List[wallaroo.pipeline_publish.PipelinePublish]):

Wraps a list of published pipelines for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

29 - wallaroo.pipeline_version

class PipelineVersion(wallaroo.object.Object):

A specific version of a Pipeline. This usually reflects a change to the Pipeline Definition.

PipelineVersion(client: Optional[wallaroo.client.Client], data: Dict[str, Any])

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def create_time(*args, **kwargs):
def last_update_time(*args, **kwargs):
def name(*args, **kwargs):
def definition(*args, **kwargs):
def pipeline(*args, **kwargs):
def deployments(*args, **kwargs):
def model_configs(*args, **kwargs):
def publishes(*args, **kwargs):
def deploy( self, deployment_name: str, model_configs: List[wallaroo.model_config.ModelConfig], config: Optional[wallaroo.deployment_config.DeploymentConfig] = None) -> wallaroo.deployment.Deployment:

Deploys this PipelineVersion.

Parameters
  • str deployment_name: Name of the new Deployment. Must be unique across all deployments.
  • List[ModelConfig] model_configs: List of the configured models to use. These must be the same ModelConfigs used when creating the Pipeline.
  • Optional[DeploymentConfig] config: Deployment configuration to use.
Returns

A Deployment object for the resulting deployment.

def publish( self, deployment_config: Optional[wallaroo.deployment_config.DeploymentConfig] = None, replaces: Optional[List[Union[int, wallaroo.pipeline_publish.PipelinePublish]]] = None):

Publish a pipeline version.

class PipelineVersionList(typing.List[wallaroo.pipeline_version.PipelineVersion]):

Wraps a list of pipeline versions for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

30 - wallaroo.queries

def named(name: str) -> str:

31 - wallaroo.tag

Tags that may be attached to models and pipelines.

Tag( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def id(self) -> int:
def tag(*args, **kwargs):
def model_tags(*args, **kwargs):
def pipeline_tags(*args, **kwargs):
def list_model_versions(self) -> List[wallaroo.model_version.ModelVersion]:

Lists the model versions this tag is on.

def add_to_model(self, model_id: int):
def remove_from_model(self, model_id: int):
def list_pipelines(self) -> List[wallaroo.pipeline.Pipeline]:

Lists the pipelines this tag is on.

def add_to_pipeline(self, pipeline_id: int):
def remove_from_pipeline(self, pipeline_id: int):
class Tags(typing.List[wallaroo.tag.Tag]):

Wraps a list of tags for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

32 - wallaroo.task

Base class for all backend GraphQL API objects.

This class serves as a framework for API objects to be constructed based on a partially-complete JSON response, and to fill in their remaining members dynamically if needed.

Task( client: wallaroo.client.Client, data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def kill(self):

Kill this Task.

@staticmethod
def list_tasks( client: wallaroo.client.Client, workspace_id: int, killed: bool = False):
@staticmethod
def get_task_by_id(client: wallaroo.client.Client, task_id: str):
def id(*args, **kwargs):
def name(*args, **kwargs):
def workspace_id(*args, **kwargs):
def input_data(*args, **kwargs):
def status(*args, **kwargs):
def task_type(*args, **kwargs):
def created_at(*args, **kwargs):
def updated_at(*args, **kwargs):
def last_runs( self, limit: Optional[int] = None, status: Optional[str] = None) -> wallaroo.task_run.TaskRunList:

Return the runs associated with this task.

Parameters
  • limit int The number of runs to return
  • status str Return only runs with the matching status. One of "success", "failure", "running", "all"
class TaskList(typing.List[wallaroo.task.Task]):

Wraps a list of pipelines for display in a display-aware environment like Jupyter.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

33 - wallaroo.task_run

class TaskRun(wallaroo.object.Object):

Base class for all backend GraphQL API objects.

This class serves as a framework for API objects to be constructed based on a partially-complete JSON response, and to fill in their remaining members dynamically if needed.

TaskRun( client: wallaroo.client.Client, data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def logs(self, limit: Optional[int] = None) -> wallaroo.task_run.TaskRunLogs:

Returns the application logs for the given Task Run. These may be print or Exception logs running your Orchestration.

Note: The default retention policy for Orchestration logs is 30 days.

Parameters
  • limit int Limits the number of lines of logs returned. Starts from the most recent logs.
Returns

A List of str. Each str represents a newline-separated entry from the Task's log. :

class TaskRunList(typing.List[wallaroo.task_run.TaskRun]):

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort
class TaskRunLogs(typing.List[str]):

This is a list of logs associated with a Task run.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort

34 - wallaroo.unwrap

def unwrap(v: Optional[~T]) -> ~T:

Simple function to placate pylance

35 - wallaroo.user

class User:

A platform User.

User(client, data: Dict[str, Any], standalone=False)
def id(self) -> str:
def email(self) -> str:
def username(self) -> str:
def enabled(self) -> bool:
@staticmethod
def list_users( auth, api_endpoint: str = 'http://api-lb:8080', auth_endpoint: str = 'http://api-lb:8080'):
@staticmethod
def get_email_by_id(client: wallaroo.client.Client, id: str):
@staticmethod
def invite_user( email, password, auth, api_endpoint: str = 'http://api-lb:8080', auth_endpoint: str = 'http://api-lb:8080'):

36 - wallaroo.user_type

class UserType(builtins.str, enum.Enum):

Represents a workspace user's role.

OWNER = <UserType.OWNER: 'owner'>
COLLABORATOR = <UserType.COLLABORATOR: 'collaborator'>
@staticmethod
def from_str(label: str):

Creates a UserType from a str

Inherited Members
enum.Enum
name
value
builtins.str
encode
replace
split
rsplit
join
capitalize
casefold
title
center
count
expandtabs
find
partition
index
ljust
lower
lstrip
rfind
rindex
rjust
rstrip
rpartition
splitlines
strip
swapcase
translate
upper
startswith
endswith
removeprefix
removesuffix
isascii
islower
isupper
istitle
isspace
isdecimal
isdigit
isnumeric
isalpha
isalnum
isidentifier
isprintable
zfill
format
format_map
maketrans

37 - wallaroo.utils

def is_arrow_enabled():
def is_models_enabled():
def flatten_np_array_columns(df, col):
def convert_ndarray_batch_to_arrow(arr):
def generate_file_name(directory: str, file_prefix: str, file_num: int, file_suffix: str) -> str:
def create_new_arrow_file( directory: str, file_num: int, file_prefix: str, file_suffix: str, schema: pyarrow.lib.Schema) -> pyarrow.ipc.RecordBatchFileWriter:
def create_new_json_file( directory: str, file_num: int, file_prefix: str, file_suffix: str) -> <class 'TextIO'>:
def create_new_file( directory: str, file_num: int, file_prefix: str, schema: Optional[pyarrow.lib.Schema] = None, arrow: Optional[bool] = False) -> Union[pyarrow.ipc.RecordBatchFileWriter, TextIO]:
def write_to_file( record_batch: Union[pyarrow.lib.RecordBatch, str], writer: Union[pyarrow.ipc.RecordBatchFileWriter, TextIO]) -> None:

38 - wallaroo.version

39 - wallaroo.visibility

40 - wallaroo.workspace

class Workspace(wallaroo.object.Object):

Workspace provides a user and visibility context for access to models and pipelines.

Workspace( client: Optional[wallaroo.client.Client], data: Dict[str, Any], standalone=False)

Base constructor.

Each object requires:

  • a GraphQL client - in order to fill its missing members dynamically
  • an initial data blob - typically from unserialized JSON, contains at
  • least the data for required members (typically the object's primary key) and optionally other data members.
def to_json(self):
def id(self) -> int:
def name(*args, **kwargs):
def archived(*args, **kwargs):
def created_at(*args, **kwargs):
def created_by(*args, **kwargs):
def models(*args, **kwargs):

Returns a List of Models objects that have been created in this workspace.

def pipelines(*args, **kwargs):
def users(*args, **kwargs):
def add_user(self, user_email: str) -> wallaroo.workspace.Workspace:

Add a user to workspace as participant

def add_owner(self, user_email: str) -> wallaroo.workspace.Workspace:

Add a user to workspace as owner

def remove_user(self, user_email: str):
def list_connections(self) -> wallaroo.connection.ConnectionList:

Return a list of Connections available in this Workspace.

Returns

List of Connections in this Workspace.

def add_connection(self, name: str):

Adds an existing Connection with the given name to this Workspace.

def remove_connection(self, name: str):

Removes a Connection with the given name from this Workspace.

class Workspaces(typing.List[wallaroo.workspace.Workspace]):

Wraps a list of workspaces for display.

Inherited Members
builtins.list
list
clear
copy
append
insert
extend
pop
remove
index
count
reverse
sort