class
Client:
Client handle to a Wallaroo platform instance.
Objects of this class serve as the entrypoint to Wallaroo platform
functionality.
Client( api_endpoint: str = 'http://api-lb:8080', auth_endpoint: str = '', request_timeout: Optional[int] = None, auth_type: Optional[str] = None, gql_client: Optional[gql.client.Client] = None, interactive: Optional[bool] = None, time_format: str = '%Y-%d-%b %H:%M:%S')
Create a Client handle.
Parameters
- str api_endpoint: Host/port of the platform API endpoint
- str auth_endpoint: Host/port of the platform Keycloak instance
- int timeout: Max timeout of web requests, in seconds
- str auth_type: Authentication type to use. Can be one of: "none",
"sso", "user_password".
- bool interactive: If provided and True, some calls will print additional human information, or won't when False. If not provided, interactive defaults to True if running inside Jupyter and False otherwise.
- str time_format: Preferred
strftime
format string for displaying timestamps in a human context.
@staticmethod
def
get_urls( auth_type: Optional[str], api_endpoint: str, auth_endpoint: str) -> Tuple[Optional[str], str, str]:Method to calculate the auth values specified as defaults,
as params or in ENV vars.
Made static to be testable without reaching out to SSO, etc.
def
list_models(self) -> wallaroo.models.ModelsList:
List all models on the platform.
Returns
A list of all models on the platform.
def
list_deployments(self) -> List[wallaroo.deployment.Deployment]:
List all deployments (active or not) on the platform.
Returns
A list of all deployments on the platform.
def
search_pipelines( self, search_term: Optional[str] = None, deployed: Optional[bool] = None, created_start: Optional[Datetime] = None, created_end: Optional[Datetime] = None, updated_start: Optional[Datetime] = None, updated_end: Optional[Datetime] = None) -> wallaroo.pipeline_variant.PipelineVariants:
Search for pipelines. All parameters are optional, in which case the result is the same as
list_pipelines()
. All times are strings to be parsed by datetime.isoformat
. Example:
myclient.search_pipelines(created_end='2022-04-19 13:17:59+00:00', search_term="foo")
Parameters
- str search_term: Will be matched against tags and model names. Example: "footag123".
- bool deployed: Pipeline was deployed or not
- str created_start: Pipeline was created at or after this time
- str created_end: Pipeline was created at or before this time
- str updated_start: Pipeline was updated at or before this time
- str updated_end: Pipeline was updated at or before this time
Returns
A list of all pipelines on the platform.
def
search_my_models( self, search_term: Optional[str] = None, uploaded_time_start: Optional[Datetime] = None, uploaded_time_end: Optional[Datetime] = None) -> wallaroo.model.ModelVersions:
Search models owned by you
params:
search_term: Searches the following metadata: names, shas, versions, file names, and tags
uploaded_time_start: Inclusive time of upload
uploaded_time_end: Inclusive time of upload
def
search_models( self, search_term: Optional[str] = None, uploaded_time_start: Optional[Datetime] = None, uploaded_time_end: Optional[Datetime] = None) -> wallaroo.model.ModelVersions:
Search all models you have access to.
params:
search_term: Searches the following metadata: names, shas, versions, file names, and tags
uploaded_time_start: Inclusive time of upload
uploaded_time_end: Inclusive time of upload
def
get_user_by_email(self, email: str) -> Optional[wallaroo.user.User]:
def
deactivate_user(self, email: str) -> None:
Deactivates an existing user of the platform
Deactivated users cannot log into the platform.
Deactivated users do not count towards the number of allotted user seats from the license.
The Models and Pipelines owned by the deactivated user are not removed from the platform.
Parameters
- str email: The email address of the user to deactivate.
Returns
None
def
activate_user(self, email: str) -> None:
Activates an existing user of the platform that had been previously deactivated.
Activated users can log into the platform.
Parameters
- str email: The email address of the user to activate.
Returns
None
def
list_users(self) -> List[wallaroo.user.User]:
List of all Users on the platform
Returns
A list of all Users on the platform.
def
upload_model( self, name: str, path: Union[str, pathlib.Path], framework: Optional[wallaroo.framework.Framework] = None, input_schema: Optional[pyarrow.lib.Schema] = None, output_schema: Optional[pyarrow.lib.Schema] = None, convert_wait: Optional[bool] = True) -> wallaroo.model.Model:
Upload a model defined by a file as a new model variant.
Parameters
- name: str The name of the model of which this is a variant.
Names must be ASCII alpha-numeric characters or dash (-) only.
- path: Union[str, pathlib.Path] Path of the model file to upload.
- framework: Optional[Framework] Supported model frameworks.
Use models from Framework Enum. Example: Framework.PYTORCH, Framework.TENSORFLOW
- input_schema: Optional pa.Schema Input schema, required for flavors other than ONNX, Tensorflow, and Python
- output_schema: Optional pa.Schema Output schema, required for flavors other than ONNX, Tensorflow, and Python
- convert_wait: Optional bool Defaults to True. Specifies if method should return when conversion is over or not.
Returns
The created Model.
def
register_model_image(self, name: str, image: str) -> wallaroo.model.Model:
Registers an MLFlow model as a new model.
Parameters
- str model_name: The name of the model of which this is a variant.
Names must be ASCII alpha-numeric characters or dash (-) only.
- str image: Image name of the MLFlow model to register.
Returns
The created Model.
def
model_by_name(self, model_class: str, model_name: str) -> wallaroo.model.Model:
Fetch a Model by name.
Parameters
- str model_class: Name of the model class.
- str model_name: Name of the variant within the specified model class.
Returns
The Model with the corresponding model and variant name.
def
deployment_by_name(self, deployment_name: str) -> wallaroo.deployment.Deployment:
Fetch a Deployment by name.
Parameters
- str deployment_name: Name of the deployment.
Returns
The Deployment with the corresponding name.
def
pipelines_by_name(self, pipeline_name: str) -> List[wallaroo.pipeline.Pipeline]:
Fetch Pipelines by name.
Parameters
- str pipeline_name: Name of the pipeline.
Returns
The Pipeline with the corresponding name.
def
list_pipelines(self) -> List[wallaroo.pipeline.Pipeline]:
List all pipelines on the platform.
Returns
A list of all pipelines on the platform.
def
build_pipeline(self, pipeline_name: str) -> wallaroo.pipeline.Pipeline:
Starts building a pipeline with the given pipeline_name
,
returning a :py:PipelineConfigBuilder:
When completed, the pipeline can be uploaded with .upload()
Parameters
- pipeline_name string: Name of the pipeline, must be composed of ASCII
alpha-numeric characters plus dash (-).
def
create_value_split_experiment( self, name: str, meta_key: str, default_model: wallaroo.model_config.ModelConfig, challenger_models: List[Tuple[Any, wallaroo.model_config.ModelConfig]]) -> wallaroo.pipeline.Pipeline:
Creates a new PipelineVariant of a "value-split experiment" type.
Parameters
- str name: Name of the Pipeline
- meta_key str: Inference input key on which to redirect inputs to
experiment models.
- default_model ModelConfig: Model to send inferences by default.
- challenger_models List[Tuple[Any, ModelConfig]]: A list of
meta_key values -> Models to send inferences. If the inference data
referred to by meta_key is equal to one of the keys in this tuple,
that inference is redirected to the corresponding model instead of
the default model.
@staticmethod
def
cleanup_arrow_data_for_display(arrow_data: pyarrow.lib.Table) -> pyarrow.lib.Table:Cleans up the inference result and log data from engine / plateau for display (ux) purposes.
def
get_logs( self, topic: str, limit: Optional[int] = None, start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, directory: Optional[str] = None, file_prefix: Optional[str] = None, data_size_limit: Optional[str] = None, arrow: Optional[bool] = False) -> Tuple[Union[pyarrow.lib.Table, pandas.core.frame.DataFrame, wallaroo.logs.LogEntries, NoneType], Optional[str]]:
Get logs for the given topic.
Parameters
- topic: str The topic to get logs for.
- limit: Optional[int] The maximum number of logs to return.
- start_datetime: Optional[datetime] The start time to get logs for.
- end_datetime: Optional[datetime] The end time to get logs for.
:param dataset: Optional[List[str]] By default this is set to ["*"] which returns,
["time", "in", "out", "check_failures"]. Other available options - ["metadata"]
- dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
- dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
- directory: Optional[str] If set, logs will be exported to a file in the given directory.
- file_prefix: Optional[str] Prefix to name the exported file. Required if directory is set.
- data_size_limit: Optional[str] The maximum size of the exported data in MB.
Size includes all files within the provided directory. By default, the data_size_limit will be set to 100MB.
- arrow: Optional[bool] If set to True, return logs as an Arrow Table. Else, returns Pandas DataFrame.
Returns
Tuple[Union[pa.Table, pd.DataFrame, LogEntries], str] The logs and status.
def
security_logs(self, limit: int) -> List[dict]:
This function is not available in this release
def
get_raw_logs( self, topic: str, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, limit: int = 100000, parse: bool = False, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, verbose: bool = False) -> Union[List[Dict[str, Any]], pandas.core.frame.DataFrame]:
Gets logs from Plateau for a particular time window without attempting
to convert them to Inference LogEntries. Logs can be returned as strings
or the json parsed into lists and dicts.
Parameters
- topic str: The name of the topic to query
- start Optional[datetime]: The start of the time window
- end Optional[datetime]: The end of the time window
- limit int: The number of records to retrieve. Note retrieving many
records may be a performance bottleneck.
- parse bool: Wether to attempt to parse the string as a json object.
- verbose bool: Prints out info to help diagnose issues.
def
get_raw_pipeline_inference_logs( self, topic: str, start: datetime.datetime, end: datetime.datetime, model_name: Optional[str] = None, limit: int = 100000, verbose: bool = False) -> List[Union[Dict[str, Any], pandas.core.frame.DataFrame]]:
Gets logs from Plateau for a particular time window and filters them for
the model specified.
Parameters
- pipeline_name str: The name/pipeline_id of the pipeline to query
- topic str: The name of the topic to query
- start Optional[datetime]: The start of the time window
- end Optional[datetime]: The end of the time window
- model_id: The name of the specific model to filter if any
- limit int: The number of records to retrieve. Note retrieving many
records may be a performance bottleneck.
- verbose bool: Prints out info to help diagnose issues.
def
get_pipeline_inference_dataframe( self, topic: str, start: datetime.datetime, end: datetime.datetime, model_name: Optional[str] = None, limit: int = 100000, verbose=False) -> pandas.core.frame.DataFrame:
def
get_assay_results( self, assay_id: int, start: datetime.datetime, end: datetime.datetime) -> wallaroo.assay.AssayAnalysisList:
Gets the assay results for a particular time window, parses them, and returns an
AssayAnalysisList of AssayAnalysis.
Parameters
- assay_id int: The id of the assay we are looking for.
- start datetime: The start of the time window
- end datetime: The end of the time window
def
build_assay( self, assay_name: str, pipeline: wallaroo.pipeline.Pipeline, model_name: str, baseline_start: datetime.datetime, baseline_end: datetime.datetime) -> wallaroo.assay_config.AssayBuilder:
Creates an AssayBuilder that can be used to configure and create
Assays.
Parameters
- assay_name str: Human friendly name for the assay
- pipeline Pipeline: The pipeline this assay will work on
- model_name str: The model that this assay will monitor
- baseline_start datetime: The start time for the inferences to
use as the baseline
- baseline_end datetime: The end time of the baseline window.
the baseline. Windows start immediately after the baseline window and
are run at regular intervals continously until the assay is deactivated
or deleted.
def
upload_assay(self, config: wallaroo.assay_config.AssayConfig) -> int:
Creates an assay in the database.
Parameters
- config AssayConfig: The configuration for the assay to create.
Returns
The identifier for the assay that was created.
:rtype int
def
list_assays(self) -> List[wallaroo.assay.Assay]:
List all assays on the platform.
Returns
A list of all assays on the platform.
def
create_tag(self, tag_text: str) -> wallaroo.tag.Tag:
Create a new tag with the given text.
def
create_workspace(self, workspace_name: str) -> wallaroo.workspace.Workspace:
Create a new workspace with the current user as its first owner.
Parameters
- str workspace_name: Name of the workspace, must be composed of ASCII
alpha-numeric characters plus dash (-)
def
list_workspaces(self) -> List[wallaroo.workspace.Workspace]:
List all workspaces on the platform which this user has permission see.
Returns
A list of all workspaces on the platform.
def
set_current_workspace( self, workspace: wallaroo.workspace.Workspace) -> wallaroo.workspace.Workspace:
Any calls involving pipelines or models will use the given workspace from then on.
def
get_current_workspace(self) -> wallaroo.workspace.Workspace:
Return the current workspace. See also set_current_workspace
.
def
invite_user(self, email, password=None):
def
get_topic_name(self, pipeline_pk_id: int) -> str:
def
shim_token(self, token_data: wallaroo.auth.TokenData):
def
convert_model( self, path: Union[str, pathlib.Path], source_type: wallaroo.ModelConversion.ModelConversionSource, conversion_arguments: Union[wallaroo.ModelConversion.ConvertKerasArguments, wallaroo.ModelConversion.ConvertSKLearnArguments, wallaroo.ModelConversion.ConvertXGBoostArgs]) -> wallaroo.model.Model:
Given an inbound source model, a model type (xgboost, keras, sklearn), and conversion arguments.
Convert the model to onnx, and add to available models for a pipeline.
Parameters
- Union[str, pathlib.Path] path: The path to the model to convert, i.e. the source model.
- ModelConversionSource source: The origin model type i.e. keras, sklearn or xgboost.
- ModelConversionArguments conversion_arguments: A structure representing the arguments for converting a specific model type.
Returns
An instance of the Model being converted to Onnx.
Raises
- ModelConversionGenericException: On a generic failure, please contact our support for further assistance.
- ModelConversionFailure: Failure in converting the model type.
- ModelConversionUnsupportedType: Raised when the source type passed is not supported.
- ModelConversionSourceFileNotPresent: Raised when the passed source file does not exist.
def
list_orchestrations(self):
List all Orchestrations in the current workspace.
Returns
A List containing all Orchestrations in the current workspace.
def
upload_orchestration( self, bytes_buffer: Optional[bytes] = None, path: Optional[str] = None, name: Optional[str] = None, file_name: Optional[str] = None):
Upload a file to be packaged and used as an Orchestration.
The uploaded artifact must be a ZIP file which contains:
- User code. If
main.py
exists, then that will be used as the task entrypoint. Otherwise,
the first main.py
found in any subdirectory will be used as the entrypoint. - Optional: A standard Python
requirements.txt
for any dependencies to be provided in the
task environment. The Wallaroo SDK will already be present and should not be mentioned.
Multiple requirements.txt
files are not allowed. - Optional: Any other artifacts desired for runtime, including data or code.
Parameters
- Optional[str] path: The path to the file on your filesystem that will be uploaded as an Orchestration.
- Optional[bytes] bytes_buffer: The raw bytes to upload to be used Orchestration. Cannot be used with the
path
param. - Optional[str] name: An optional descriptive name for this Orchestration.
- Optional[str] file_name: An optional filename to describe your Orchestration when using the bytes_buffer param. Ignored when
path
is used.
Returns
The Orchestration that was uploaded.
:raises OrchestrationUploadFailed If a server-side error prevented the upload from succeeding.
def
list_tasks(self, killed: bool = False):
List all Tasks in the current Workspace.
Returns
A List containing Task objects.
def
get_task_by_id(self, task_id: str):
Retrieve a Task by its ID.
Parameters
- str task_id: The ID of the Task to retrieve.
Returns
A Task object.
def
in_task(self) -> bool:
Determines if this code is inside an orchestration task.
Returns
True if running in a task.
def
task_args(self) -> Dict[Any, Any]:
When running inside a task (see in_task()
), obtain arguments passed to the task.
Returns
Dict of the arguments
def
list_connections(self) -> wallaroo.connection.ConnectionList:
List all Connections defined in the platform.
Returns
List of Connections in the whole platform.
def
get_connection(self, name=<class 'str'>) -> wallaroo.connection.Connection:
Retrieves a Connection by its name.
Returns
Connection to an external data source.
def
create_connection( self, name=<class 'str'>, connection_type=<class 'str'>, details=typing.Dict[str, typing.Any]) -> wallaroo.connection.Connection:
Creates a Connection with the given name, type, and type-specific details.
Returns
Connection to an external data source.
def
mlops(self) -> wallaroo.wallaroo_ml_ops_api_client.client.AuthenticatedClient: