wallaroo.assay_config


class BinMode(builtins.str, enum.Enum):

How should we calculate the bins. NONE - no bins. Only useful if we only care about the mean, median, etc. EQUAL - evenly spaced bins: min - max / num_bins QUANTILE - based on percentages. If num_bins is 5 then quintiles so bins are created at the 20%, 40%, 60%, 80% and 100% points. PROVIDED - user provides the edge points for the bins.

NONE = <BinMode.NONE: 'None'>
EQUAL = <BinMode.EQUAL: 'Equal'>
QUANTILE = <BinMode.QUANTILE: 'Quantile'>
PROVIDED = <BinMode.PROVIDED: 'Provided'>
class Aggregation(builtins.str, enum.Enum):

What we use to calculate the score. EDGES - distnces between the edges. DENSITY - percentage of values that fall in each bin. CUMULATIVE - cumulative percentage that fall in the bins.

EDGES = <Aggregation.EDGES: 'Edges'>
DENSITY = <Aggregation.DENSITY: 'Density'>
CUMULATIVE = <Aggregation.CUMULATIVE: 'Cumulative'>
class Metric(builtins.str, enum.Enum):

How we calculate the score. MAXDIFF - maximum difference between corresponding bins. SUMDIFF - sum of differences between corresponding bins. PSI - Population Stability Index

MAXDIFF = <Metric.MAXDIFF: 'MaxDiff'>
SUMDIFF = <Metric.SUMDIFF: 'SumDiff'>
PSI = <Metric.PSI: 'PSI'>
class SummarizerConfig:

The summarizer specifies how the bins of the baseline and window should be compared.

def to_json(self) -> str:
class UnivariateContinousSummarizerConfig(SummarizerConfig):

The UnivariateContinousSummarizer analyizes one input or output feature (Univariate) at a time. Expects the values to be continous or at least numerous enough to fall in various/all the bins.

UnivariateContinousSummarizerConfig( bin_mode: BinMode, aggregation: Aggregation, metric: Metric, num_bins: int, bin_weights: Optional[List[float]] = None, bin_width: Optional[float] = None, provided_edges: Optional[List[float]] = None, add_outlier_edges: bool = True)
type
bin_mode
aggregation
metric
num_bins
bin_weights
bin_width
provided_edges
add_outlier_edges
Inherited Members
class SummarizerBuilder(abc.ABC):

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def build(self) -> SummarizerConfig:
class UnivariateContinousSummarizerBuilder(SummarizerBuilder):

Builds the UnviariateSummarizer

bin_mode
aggregation
metric
num_bins
bin_weights: Optional[List[float]]
bin_width: Optional[float]
provided_edges: Optional[List[float]]
add_outlier_edges
def add_bin_mode( self, bin_mode: BinMode, edges: Optional[List[float]] = None):

Sets the binning mode. If BinMode.PROVIDED is specified a list of edges is also required.

def add_num_bins(self, num_bins: int):

Sets the number of bins. If weights have been previously set they must be set to none to allow changing the number of bins.

def add_bin_weights(self, weights: Optional[List[float]]):

Specifies the weighting to be given to the bins. The number of weights must be 2 larger than the number of bins to accomodate outliers smaller and outliers larger than values seen in the baseline. The passed in values can be whole or real numbers and do not need to add up to 1 or any other specific value as they will be normalized during the score calculation phase. The weights passed in can be none to remove previously specified weights and to allow changing of the number of bins.

def add_metric(self, metric: Metric):

Sets the metric mode.

def add_aggregation(self, aggregation: Aggregation):

Sets the aggregation style.

def add_bin_edges(self, edges: Optional[List[float]]):

Specifies the right hand side (max value) of the bins. The number of edges must be equal to or one more than the number of bins. When equal to the number of bins the edge for the left outlier bin is calculated from the baseline. When an additional edge (one more than number of bins) that first (lower) value is used as the max value for the left outlier bin. The max value for the right hand outlier bin is always Float MAX.

class WindowConfig:

Configures a window to be compared against the baseline.

WindowConfig( pipeline_name: str, width: str, model_name: Optional[str] = None, start: Optional[datetime.datetime] = None, interval: Optional[str] = None, path: Optional[str] = None, workspace_id: Optional[int] = None, locations: List[str] = [])
pipeline_name
model_name
width
start
interval
path
workspace_id
locations
def to_json(self) -> str:
class BaselineConfig:

Abstract base class for Baseline config objects. Currently only CalculatedBaseline (fixed window) and StaticBaseline are implemented.

def to_json(self) -> str:
class CalculatedBaseline(BaselineConfig):

The CalculatedBaseline is calculated from the inferences from a specific time window.

CalculatedBaseline( pipeline_name: str, model_name: str, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, locations: List[str] = [])
calculated
Inherited Members
class FixedBaseline(CalculatedBaseline):

The FixedBaseline is calculated from the inferences from a specific time window.

FixedBaseline( pipeline_name: str, model_name: str, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, locations: List[str] = [])
calculated
Inherited Members
class StaticBaseline(BaselineConfig):

The StaticBaseline is pre-calculated data from the inferences in a specific time window.

StaticBaseline( count: int, min_: float, max_: float, mean: float, median: float, std: float, edges: List[float], edge_names: List[str], aggregated_values: List[float], aggregation: Aggregation, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None)
static
Inherited Members
class BaselineBuilder(abc.ABC):

Helper class that provides a standard way to create an ABC using inheritance.

@abstractmethod
def build(self) -> BaselineConfig:
def to_json(self) -> str:
class VectorBaselineBuilder(BaselineBuilder):

Helps create a config object for a VectorBaseline.

VectorBaselineBuilder( client: wallaroo.client.Client, pipeline_name: str, alert_threshold: Optional[float], warning_threshold: Optional[float], pipeline_id: Optional[int], workspace_name: Optional[str])
client
baseline_data: Optional[List[float]]
window: Optional[WindowConfig]
summarizer: Optional[SummarizerConfig]
pipeline_name
alert_threshold
warning_threshold
pipeline_id
workspace_name
def add_baseline_data( self, baseline_data: numpy.ndarray) -> VectorBaselineBuilder:

Add the baseline data.

def add_summarizer( self, summarizer: SummarizerConfig) -> VectorBaselineBuilder:

Add the summarizer.

def add_window( self, window: WindowConfig) -> VectorBaselineBuilder:

Add the window.

def add_workspace_id(self, workspace_id: int) -> VectorBaselineBuilder:

Add the workspace id.

def build(self) -> StaticBaseline:

Create the StaticBaseline object.

Inherited Members
class FixedWindowBaselineBuilder(BaselineBuilder):

Helps to easily create the config object for a FixedBaseline.

FixedWindowBaselineBuilder( client: wallaroo.client.Client, pipeline_name: str, alert_threshold: Optional[float] = None, warning_threshold: Optional[float] = None, pipeline_id: Optional[int] = None, workspace_name: Optional[str] = None)
client
pipeline_name
model_name: Optional[str]
start: Optional[datetime.datetime]
end: Optional[datetime.datetime]
iopath: Optional[str]
workspace_id: Optional[int]
summarizer: Optional[SummarizerConfig]
locations: List[str]
alert_threshold
warning_threshold
pipeline_id
workspace_name
window: Optional[WindowConfig]
def add_model_name( self, model_name: str) -> FixedWindowBaselineBuilder:

Specify the model to use in the baseline

def add_start( self, start: datetime.datetime) -> FixedWindowBaselineBuilder:

Specify the start of the window for the baseline

def add_end( self, end: datetime.datetime) -> FixedWindowBaselineBuilder:

Specify the end of the window for the baseline

def add_iopath(self, iopath: str) -> FixedWindowBaselineBuilder:

Specify the path to the inference data

def add_location_filter( self, locations: List[str]) -> FixedWindowBaselineBuilder:
def add_workspace_id( self, workspace_id: int) -> FixedWindowBaselineBuilder:

Specify the workspace id for the inference data

def add_summarizer( self, summarizer: SummarizerConfig) -> FixedWindowBaselineBuilder:

Specify the summarizer to use

def add_window( self, window: WindowConfig) -> FixedWindowBaselineBuilder:

Specify the window to use

def build(self) -> StaticBaseline:

Create the FixedBaseline object.

Inherited Members
class WindowBuilder:

Helps build a WindowConfig. model and width are required but there are no good default values for them because they depend on the baseline. We leave it up to the assay builder to configure the window correctly after it is created.

WindowBuilder(pipeline_name: str)
pipeline_name
model_name: Optional[str]
width: Optional[str]
start: Optional[datetime.datetime]
interval: Optional[str]
path: Optional[str]
workspace_id: Optional[int]
locations: List[str]
def add_model_name(self, model_name: str):

The model name (model_id) that the window should analyze.

def add_width(self, **kwargs: int):

The width of the window to use when collecting data for analysis.

def add_interval(self, **kwargs: int):

The width of the window to use when collecting data for analysis.

def add_location_filter(self, locations: List[str] = []):
def add_start(self, start: datetime.datetime):
def add_path(self, path: str):
def add_workspace_id(self, workspace_id: int):
def build(self) -> WindowConfig:
def ConfigEncoder(o):

Used to format datetimes as we need when encoding to JSON

class AssayConfig:

Configuration for an Assay record.

AssayConfig( client: wallaroo.client.Client, name: str, pipeline_id: int, pipeline_name: str, active: bool, status: str, baseline: BaselineConfig, window: WindowConfig, summarizer: SummarizerConfig, warning_threshold: Optional[float], alert_threshold: float, run_until: Optional[datetime.datetime], workspace_id: Optional[int], workspace_name: Optional[str])
client
name
pipeline_id
pipeline_name
active
status
baseline
window
summarizer
warning_threshold
alert_threshold
run_until
workspace_id
workspace_name
def to_json(self) -> str:
def interactive_run(self) -> wallaroo.custom_types.IAssayAnalysisList:

Runs this assay interactively. The assay is not saved to the database nor are analyis records saved to a Plateau topic. Useful for exploring pipeline inference data and experimenting with thresholds.

def interactive_baseline_run(self) -> Optional[wallaroo.custom_types.IAssayAnalysis]:
def interactive_input_run_arrow( self, inferences: pandas.core.frame.DataFrame, labels: Optional[List[str]]) -> wallaroo.assay.AssayAnalysisList:
def interactive_input_run_legacy( self, inferences: List[Dict], labels: Optional[List[str]]) -> wallaroo.assay.AssayAnalysisList:
def interactive_input_run( self, inferences: Union[List[Dict], pandas.core.frame.DataFrame], labels: Optional[List[str]]) -> wallaroo.assay.AssayAnalysisList:

Analyzes the inputs given to create an interactive run for each feature column. The assay is not saved to the database nor are analyis records saved to a Plateau topic. Usefull for exploring inputs for possible causes when a difference is detected in the output.

class AssayBuilder:

Helps build an AssayConfig

AssayBuilder( client: wallaroo.client.Client, name: str, pipeline_id: int, pipeline_name: str, iopath: str, model_name: Optional[str] = None, baseline_start: Optional[datetime.datetime] = None, baseline_end: Optional[datetime.datetime] = None, baseline_data: Optional[numpy.ndarray] = None)
client
name
pipeline_id
pipeline_name: str
active
status
iopath
baseline: Optional[BaselineConfig]
baseline_builder: Union[FixedWindowBaselineBuilder, VectorBaselineBuilder, NoneType]
window: Optional[WindowConfig]
summarizer: Optional[SummarizerConfig]
warning_threshold: Optional[float]
alert_threshold: float
run_until: Optional[datetime.datetime]
workspace_id
workspace_name
window_builder_
summarizer_builder
baseline_data
def baseline_dataframe(self):
def baseline_histogram( self, bins: Union[str, int, NoneType] = None, log_scale: bool = False):
def baseline_kde(self, log_scale: bool = False):
def baseline_ecdf(self, log_scale: bool = False):
def build(self) -> AssayConfig:
def upload(self) -> int:
def add_name(self, name: str):

Specify the assay name

def add_active(self, active: bool):

Specify if the assay is active or not

def add_iopath(self, iopath: str):

Specify what the assay should analyze. Should start with input or output and have indexes (zero based) into row and column: For example 'input 0 1' specifies the second column of the first input.

def add_location_filter(self, locations: List[str]):
def fixed_baseline_builder(self):

Specify creates a fixed baseline builder for this assay builder.

def add_baseline(self, baseline: BaselineConfig):

Adds a specific baseline created elsewhere.

def window_builder(self):

Returns this assay builders window builder.

def add_window(self, window: WindowConfig):

Adds a window created elsewhere.

def univariate_continuous_summarizer(self) -> UnivariateContinousSummarizerBuilder:

Creates and adds an UCS to this assay builder.

def add_summarizer(self, summarizer: SummarizerConfig):

Adds the summarizer created elsewhere to this builder.

def add_warning_threshold(self, warning_threshold: float):

Specify the warning threshold for this assay.

def add_alert_threshold(self, alert_threshold: float):

Specify the alert threshold for this assay.

def add_run_until(self, run_until: datetime.datetime):

"How long should this assay run. Primarily useful for interactive runs to limit the number of analysis.

def calc_bins(num_samples: int, bins: Union[str, int, NoneType]) -> Union[str, int]:

If the users specifies a number of bins or a strategy for calculating it use that. Else us the min of the square root or 50.