wallaroo.client


class Client:

Client handle to a Wallaroo platform instance.

Objects of this class serve as the entrypoint to Wallaroo platform functionality.

Client( api_endpoint: Optional[str] = None, request_timeout: Optional[int] = None, auth_type: Optional[str] = None, gql_client: Optional[gql.client.Client] = None, interactive: Optional[bool] = None, time_format: str = '%Y-%d-%b %H:%M:%S', config: Optional[dict] = None)

Create a Client handle.

Parameters
  • Optional[str] api_endpoint: Host/port of the platform API endpoint. If not provided, the value of the WALLAROO_URL environment variable will be used.
  • Optional[int] request_timeout: Max timeout of web requests, in seconds
  • Optional[str] auth_type: Authentication type to use. Can be one of: "none", "sso", "user_password".
  • Optional[bool] interactive: If provided and True, some calls will print additional human information, or won't when False. If not provided, interactive defaults to True if running inside Jupyter and False otherwise.
  • str time_format: Preferred strftime format string for displaying timestamps in a human context.
@staticmethod
def get_urls( auth_type: Optional[str] = None, api_endpoint: Optional[str] = None) -> Tuple[Optional[str], Optional[str]]:

Method to calculate the auth values specified as defaults, as params or in ENV vars. Made static to be testable without reaching out to SSO, etc.

def list_tags(self) -> wallaroo.tag.Tags:

List all tags on the platform.

Returns

A list of all tags on the platform.

def list_models( self, workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> wallaroo.model.ModelList:

List all models in the current or a specified workspace.

Parameters
  • workspace_id: Optional[int]: The workspace id to search in. If not provided, the current workspace id is used.
  • workspace_name: Optional[str]: The workspace name to search in. If not provided, the current workspace name is used.
Returns

A list of all models in the workspace.

def list_deployments(self) -> List[wallaroo.deployment.Deployment]:

List all deployments (active or not) on the platform.

Returns

A list of all deployments on the platform.

def search_pipelines( self, search_term: Optional[str] = None, deployed: Optional[bool] = None, created_start: Optional[Datetime] = None, created_end: Optional[Datetime] = None, updated_start: Optional[Datetime] = None, updated_end: Optional[Datetime] = None, workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> wallaroo.pipeline_version.PipelineVersionList:

Search for pipelines. All parameters are optional, in which case the result is the same as list_pipelines(). All times are strings to be parsed by datetime.isoformat. Example:

 myclient.search_pipelines(created_end='2022-04-19 13:17:59+00:00', search_term="foo")
Parameters
  • str search_term: Will be matched against tags and model names. Example: "footag123".
  • bool deployed: Pipeline was deployed or not
  • str created_start: Pipeline was created at or after this time
  • str created_end: Pipeline was created at or before this time
  • str updated_start: Pipeline was updated at or before this time
  • str updated_end: Pipeline was updated at or before this time
  • int workspace_id: The workspace id to search in
  • str workspace_name: The workspace name to search in
Returns

A list of pipeline versions matching the search criteria.

def search_pipeline_versions( self, search_term: Optional[str] = None, deployed: Optional[bool] = None, created_start: Optional[Datetime] = None, created_end: Optional[Datetime] = None, updated_start: Optional[Datetime] = None, updated_end: Optional[Datetime] = None) -> wallaroo.pipeline_version.PipelineVersionList:

Search for pipeline versions. All parameters are optional. All times are strings to be parsed by datetime.isoformat. Example:

myclient.search_pipeline_versions(created_end='2022-04-19 13:17:59+00:00', search_term="foo")

Parameters
  • str search_term: Will be matched against tags and model names. Example: "footag123".
  • bool deployed: Pipeline was deployed or not
  • str created_start: Pipeline was created at or after this time
  • str created_end: Pipeline was created at or before this time
  • str updated_start: Pipeline was updated at or before this time
  • str updated_end: Pipeline was updated at or before this time
Returns

A list of pipeline versions matching the search criteria.

def search_models( self, search_term: Optional[str] = None, uploaded_time_start: Optional[Datetime] = None, uploaded_time_end: Optional[Datetime] = None, workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> wallaroo.model_version.ModelVersionList:

Search all models you have access to.

Parameters
  • search_term: Optional[str]: Searches the following metadata: names, shas, versions, file names, and tags
  • uploaded_time_start: Optional[Datetime]: Inclusive time of upload
  • uploaded_time_end: Optional[Datetime] Inclusive time of upload
  • workspace_id: Optional[int]: The workspace id to search in
  • workspace_name: Optional[str]: The workspace name to search in
Returns

ModelVersionList

def search_model_versions( self, search_term: Optional[str] = None, uploaded_time_start: Optional[Datetime] = None, uploaded_time_end: Optional[Datetime] = None, workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> wallaroo.model_version.ModelVersionList:

Search all model versions you have access to. Example:

client.search_model_versions(search_term="my_model")

Parameters
  • search_term: Optional[str]: Searches the following metadata: names, shas, versions, file names, and tags
  • uploaded_time_start: Optional["Datetime"]: Inclusive time of upload
  • uploaded_time_end: Optional["Datetime"]: Inclusive time of upload
  • workspace_id: Optional[int]: The workspace id to search in
  • workspace_name: Optional[str]: The workspace name to search in
Returns

ModelVersionList

def get_user_by_email(self, email: str) -> Optional[wallaroo.user.User]:

Find a user by email

def deactivate_user(self, email: str) -> None:

Deactivates an existing user of the platform

Deactivated users cannot log into the platform. Deactivated users do not count towards the number of allotted user seats from the license.

The Models and Pipelines owned by the deactivated user are not removed from the platform.

Parameters
  • str email: The email address of the user to deactivate.
Returns

None

def activate_user(self, email: str) -> None:

Activates an existing user of the platform that had been previously deactivated.

Activated users can log into the platform.

Parameters
  • str email: The email address of the user to activate.
Returns

None

def list_users(self) -> List[wallaroo.user.User]:

List of all Users on the platform

Returns

A list of all Users on the platform.

def upload_model( self, name: str, path: Union[str, pathlib.Path], framework: Optional[wallaroo.framework.Framework] = None, input_schema: Optional[pyarrow.lib.Schema] = None, output_schema: Optional[pyarrow.lib.Schema] = None, convert_wait: Optional[bool] = True, arch: Optional[wallaroo.engine_config.Architecture] = None, accel: Optional[wallaroo.engine_config.Acceleration] = None) -> wallaroo.model_version.ModelVersion:

Upload a model defined by a file as a new model variant.

Parameters
  • name: str The name of the model of which this is a variant. Names must be ASCII alpha-numeric characters or dash (-) only.
  • path: Union[str, pathlib.Path] Path of the model file to upload.
  • framework: Optional[Framework] Supported model frameworks. Use models from Framework Enum. Example: Framework.PYTORCH, Framework.TENSORFLOW
  • input_schema: Optional pa.Schema Input schema, required for flavors other than ONNX, Tensorflow, and Python
  • output_schema: Optional pa.Schema Output schema, required for flavors other than ONNX, Tensorflow, and Python
  • convert_wait: Optional bool Defaults to True. Specifies if method should return when conversion is over or not.
Returns

The created Model.

def generate_upload_model_api_command( self, base_url: str, name: str, path: Union[str, pathlib.Path], framework: Optional[wallaroo.framework.Framework] = None, input_schema: Optional[pyarrow.lib.Schema] = None, output_schema: Optional[pyarrow.lib.Schema] = None, arch: Optional[wallaroo.engine_config.Architecture] = None, accel: Optional[wallaroo.engine_config.Acceleration] = None) -> str:

Helper function to upload a large model via API in Wallaroo. It generates the equivalent CLI command to upload the model via API.

Parameters
  • base_url: str The base URL of the Wallaroo Cluster.
  • name: str The name of the model of which this is a variant. Names must be ASCII alpha-numeric characters or dash (-) only.
  • path: Union[str, pathlib.Path] Path of the model file to upload.
  • framework: Optional[Framework] Supported model frameworks. Use models from Framework Enum. Example: Framework.PYTORCH, Framework.TENSORFLOW
  • input_schema: Optional pa.Schema Input schema, required for flavors other than ONNX, Tensorflow, and Python
  • output_schema: Optional pa.Schema Output schema, required for flavors other than ONNX, Tensorflow, and Python
  • arch: Optional[Architecture] Supported architectures.
  • accel: Optional[Acceleration] Supported types of acceleration.
Returns

The upload_and_convert CLI command.

def register_model_image(self, name: str, image: str) -> wallaroo.model_version.ModelVersion:

Registers an MLFlow model as a new model.

Parameters
  • str model_name: The name of the model of which this is a variant. Names must be ASCII alpha-numeric characters or dash (-) only.
  • str image: Image name of the MLFlow model to register.
Returns

The created Model.

def get_model(self, name: str, version: Optional[str] = None):

Retrieves a model by name and optionally version from the current workspace.

Parameters
  • name: The name of the model.
  • version: The version of the model. If not provided, the latest version is returned.
Returns

The requested model. Raises: Exception: If the model with the given name does not exist. Exception: If the model with the given version does not exist.

def model_by_name( self, name: str, version: str, workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> wallaroo.model_version.ModelVersion:

Fetch a Model by name.

Parameters
  • name: str: Name of the model, same as model_id.
  • version: str: Version string of the model
  • workspace_id: Optional[int]: The workspace id to search in
  • workspace_name: Optional[str]: The workspace name to search in
Returns

The Model with the corresponding model and variant name.

def model_version_by_name( self, model_class: str, model_name: str) -> wallaroo.model_version.ModelVersion:

Fetch a Model version by name.

Parameters
  • str model_class: Name of the model class.
  • str model_name: Name of the variant within the specified model class.
Returns

The Model with the corresponding model and variant name.

def deployment_by_name(self, deployment_name: str) -> wallaroo.deployment.Deployment:

Fetch a Deployment by name.

Parameters
  • str deployment_name: Name of the deployment.
Returns

The Deployment with the corresponding name.

def pipelines_by_name( self, pipeline_name: str, workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> List[wallaroo.pipeline.Pipeline]:

Fetch Pipelines by name.

Parameters
  • str pipeline_name: Name of the pipeline.
  • int workspace_id: ID of the workspace. Defaults to None.
  • str workspace_name: Name of the workspace. Defaults to None.
Returns

The Pipeline with the corresponding name.

def list_pipelines( self, workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> List[wallaroo.pipeline.Pipeline]:

List all pipelines on the platform.

Parameters
  • int workspace_id: ID of the workspace. Defaults to None.
  • str workspace_name: Name of the workspace. Defaults to None.
Returns

A list of all pipelines on the platform.

def get_pipeline( self, name: str, version: Optional[str] = None) -> wallaroo.pipeline.Pipeline:

Retrieves a pipeline by name and optional version from the current workspace.

Parameters
  • name: The name of the pipeline to retrieve.
  • version: The version of the pipeline to retrieve. Defaults to None.
Returns

Pipeline: The requested pipeline. Raises: Exception: If the pipeline with the given name is not found in the workspace. Exception: If the pipeline with the given version is not found in the workspace.

def build_pipeline(self, pipeline_name: str) -> wallaroo.pipeline.Pipeline:

Starts building a pipeline with the given pipeline_name, returning a :py:PipelineConfigBuilder:

When completed, the pipeline can be uploaded with .upload()

Parameters
  • pipeline_name string: Name of the pipeline, must be composed of ASCII alpha-numeric characters plus dash (-).
def get_logs( self, topic: str, limit: Optional[int] = None, start_datetime: Optional[datetime.datetime] = None, end_datetime: Optional[datetime.datetime] = None, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, directory: Optional[str] = None, file_prefix: Optional[str] = None, data_size_limit: Optional[str] = None, arrow: Optional[bool] = False) -> Tuple[Union[pyarrow.lib.Table, pandas.core.frame.DataFrame, NoneType], Optional[str]]:

Get logs for the given topic.

Parameters
  • topic: str The topic to get logs for.
  • limit: Optional[int] The maximum number of logs to return.
  • start_datetime: Optional[datetime] The start time to get logs for.
  • end_datetime: Optional[datetime] The end time to get logs for. :param dataset: Optional[List[str]] By default this is set to ["*"] which returns, ["time", "in", "out", "anomaly"]. Other available options - ["metadata"]
  • dataset_exclude: Optional[List[str]] If set, allows user to exclude parts of dataset.
  • dataset_separator: Optional[Union[Sequence[str], str]] If set to ".", return dataset will be flattened.
  • directory: Optional[str] If set, logs will be exported to a file in the given directory.
  • file_prefix: Optional[str] Prefix to name the exported file. Required if directory is set.
  • data_size_limit: Optional[str] The maximum size of the exported data in MB. Size includes all files within the provided directory. By default, the data_size_limit will be set to 100MB.
  • arrow: Optional[bool] If set to True, return logs as an Arrow Table. Else, returns Pandas DataFrame.
Returns

Tuple[Union[pa.Table, pd.DataFrame], str] The logs and status.

def get_raw_logs( self, topic: str, start: Optional[datetime.datetime] = None, end: Optional[datetime.datetime] = None, limit: int = 100000, dataset: Optional[List[str]] = None, dataset_exclude: Optional[List[str]] = None, dataset_separator: Optional[str] = None, verbose: bool = False) -> Union[List[Dict[str, Any]], pandas.core.frame.DataFrame]:

Gets logs from Plateau for a particular time window.

Parameters
  • topic: str The name of the topic to query
  • start: Optional[datetime] The start of the time window
  • end: Optional[datetime] The end of the time window
  • limit: int The number of records to retrieve. Note retrieving many records may be a performance bottleneck.
  • verbose: bool Prints out info to help diagnose issues.
Returns

List[Dict[str, Any]], pd.DataFrame The logs from given time window.

def get_raw_pipeline_inference_logs( self, topic: str, start: datetime.datetime, end: datetime.datetime, model_name: Optional[str] = None, limit: int = 100000, verbose: bool = False) -> List[Union[Dict[str, Any], pandas.core.frame.DataFrame]]:

Gets logs from Plateau for a particular time window and filters them for the model specified.

Parameters
  • pipeline_name: The name/pipeline_id of the pipeline to query
  • topic: The name of the topic to query
  • start: The start of the time window
  • end: The end of the time window
  • model_id: The name of the specific model to filter if any
  • limit: The number of records to retrieve. Note retrieving many records may be a performance bottleneck.
  • verbose: Prints out info to help diagnose issues.
Returns

The raw logs from given time window anf filtered by model_name.

def get_pipeline_inference_dataframe( self, topic: str, start: datetime.datetime, end: datetime.datetime, model_name: Optional[str] = None, limit: int = 100000, verbose=False) -> pandas.core.frame.DataFrame:
def get_assay_results( self, assay_id: Union[str, int], start: datetime.datetime, end: datetime.datetime, workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> wallaroo.custom_types.IAssayAnalysisList:

Gets the assay results for a particular time window, parses them, and returns an List of AssayAnalysis.

Parameters
  • assay_id: int The id of the assay we are looking for.
  • start: datetime The start of the time window. If timezone info not set, uses UTC timezone by default.
  • end: datetime The end of the time window. If timezone info not set, uses UTC timezone by default.
  • workspace_id: Optional[int] The id of the workspace to retrieve the assay from.
  • workspace_name: Optional[str] The name of the workspace to retrieve the assay from.
Returns

List[IAssayAnalysis] The assay results for the given time window.

def build_assay( self, *, assay_name: str, pipeline: wallaroo.pipeline.Pipeline, iopath: str, model_name: Optional[str] = None, baseline_start: Optional[datetime.datetime] = None, baseline_end: Optional[datetime.datetime] = None, baseline_data: Optional[numpy.ndarray] = None) -> wallaroo.assay_config.AssayBuilder:

Creates an AssayBuilder that can be used to configure and create Assays.

Parameters
  • assay_name: str Human friendly name for the assay
  • pipeline: Pipeline The pipeline this assay will work on
  • iopath: str The path to the input or output of the model that this assay will monitor.
  • model_name: Optional[str] The name of the model to use for the assay.
  • baseline_start: Optional[datetime] The start time for the inferences to use as the baseline
  • baseline_end: Optional[datetime] The end time of the baseline window. the baseline. Windows start immediately after the baseline window and are run at regular intervals continuously until the assay is deactivated or deleted.
  • baseline_data: Optional[np.ndarray] Use this to load existing baseline data at assay creation time.
def upload_assay(self, config: wallaroo.assay_config.AssayConfig) -> int:

Creates an assay in the database.

Parameters
  • config AssayConfig: The configuration for the assay to create.
Returns

The identifier for the assay that was created. :rtype int

def list_assays( self, workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> List[wallaroo.assay.Assay]:

List all assays on the platform.

Parameters
  • workspace_id: Optional[int] The identifier for the workspace to retrieve the assays from.
  • workspace_name: Optional[str] The name of the workspace to retrieve the assays from.
Returns

A list of all assays on the platform, unless filtered by workspace.

def get_assay_info( self, assay_id: Union[int, str], workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> pandas.core.frame.DataFrame:

Get information about a specific assay.

Parameters
  • assay_id: int The identifier for the assay to retrieve.
  • workspace_id: Optional[int] The identifier for the workspace to retrieve the assay from.
  • workspace_name: Optional[str] The name of the workspace to retrieve the assay from.
Returns

The assay with the given identifier

def set_assay_active(self, assay_id: Union[int, str], active: bool) -> None:

Sets the state of an assay to active or inactive.

Parameters
  • assay_id: int The id of the assay to set the active state of.
  • active: bool The active state to set the assay to. Default is True.
def create_tag(self, tag_text: str) -> wallaroo.tag.Tag:

Create a new tag with the given text.

def create_workspace(self, workspace_name: str) -> wallaroo.workspace.Workspace:

Create a new workspace with the current user as its first owner.

Parameters
  • str workspace_name: Name of the workspace, must be composed of ASCII alpha-numeric characters plus dash (-)
def list_workspaces(self) -> List[wallaroo.workspace.Workspace]:

List all workspaces on the platform which this user has permission see.

Returns

A list of all workspaces on the platform.

def get_workspace( self, name: str, create_if_not_exist: Optional[bool] = False) -> Optional[wallaroo.workspace.Workspace]:

Get a workspace by name. If the workspace does not exist, create it.

Parameters
  • name: The name of the workspace to get.
  • create_if_not_exist: If set to True, create a new workspace if workspace by given name doesn't already exist. Set to False by default.
Returns

The workspace with the given name.

def set_current_workspace( self, workspace: wallaroo.workspace.Workspace) -> wallaroo.workspace.Workspace:

Any calls involving pipelines or models will use the given workspace from then on.

def get_current_workspace(self) -> wallaroo.workspace.Workspace:

Return the current workspace. See also set_current_workspace.

def invite_user(self, email, password=None):
def get_topic_name(self, pipeline_pk_id: int) -> str:
def list_orchestrations( self, workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> List[wallaroo.orchestration.Orchestration]:

List all Orchestrations in the current workspace.

Parameters
  • workspace_id: Optional[int] The ID of the workspace to list Orchestrations from.
  • workspace_name: Optional[str] The name of the workspace to list Orchestrations from.
Returns

A List containing all Orchestrations in the current workspace.

def upload_orchestration( self, bytes_buffer: Optional[bytes] = None, path: Optional[str] = None, name: Optional[str] = None, file_name: Optional[str] = None):

Upload a file to be packaged and used as an Orchestration.

The uploaded artifact must be a ZIP file which contains:

  • User code. If main.py exists, then that will be used as the task entrypoint. Otherwise, the first main.py found in any subdirectory will be used as the entrypoint.
  • Optional: A standard Python requirements.txt for any dependencies to be provided in the task environment. The Wallaroo SDK will already be present and should not be mentioned. Multiple requirements.txt files are not allowed.
  • Optional: Any other artifacts desired for runtime, including data or code.
Parameters
  • Optional[str] path: The path to the file on your filesystem that will be uploaded as an Orchestration.
  • Optional[bytes] bytes_buffer: The raw bytes to upload to be used Orchestration. Cannot be used with the path param.
  • Optional[str] name: An optional descriptive name for this Orchestration.
  • Optional[str] file_name: An optional filename to describe your Orchestration when using the bytes_buffer param. Ignored when path is used.
Returns

The Orchestration that was uploaded. :raises OrchestrationUploadFailed If a server-side error prevented the upload from succeeding.

def list_tasks( self, killed: bool = False, workspace_id: Optional[int] = None, workspace_name: Optional[str] = None) -> List[wallaroo.task.Task]:

List all Tasks in the current Workspace.

Parameters
  • killed: bool If set to True, list all killed tasks.
  • workspace_id: Optional[int] The ID of the workspace to list Tasks from.
  • workspace_name: Optional[str] The name of the workspace to list Tasks from.
Returns

A List containing Task objects.

def get_task_by_id(self, task_id: str):

Retrieve a Task by its ID.

Parameters
  • str task_id: The ID of the Task to retrieve.
Returns

A Task object.

def in_task(self) -> bool:

Determines if this code is inside an orchestration task.

Returns

True if running in a task.

def task_args(self) -> Dict[Any, Any]:

When running inside a task (see in_task()), obtain arguments passed to the task.

Returns

Dict of the arguments

def list_connections(self) -> wallaroo.connection.ConnectionList:

List all Connections defined in the platform.

Returns

List of Connections in the whole platform.

def get_connection(self, name=<class 'str'>) -> wallaroo.connection.Connection:

Retrieves a Connection by its name.

Returns

Connection to an external data source.

def create_connection( self, name=<class 'str'>, connection_type=<class 'str'>, details=typing.Dict[str, typing.Any]) -> wallaroo.connection.Connection:

Creates a Connection with the given name, type, and type-specific details.

Returns

Connection to an external data source.

def create_model_registry( self, name: str, token: str, url: str, workspace_id: Optional[int] = None) -> wallaroo.model_registry.ModelRegistry:

Create a Model Registry connection in this workspace that can be reused across workspaces.

:param name str A descriptive name for this registry :param token str A Bearer token necessary for accessing this Registry. :param url str The root URL for this registry. It should NOT include /api/2.0/mlflow as part of it. :param workspace_id int The ID of the workspace to attach this registry to, i.e. client.get_current_workspace().id().

Returns

A ModelRegistry object.

def list_model_registries(self, workspace_id: Optional[int] = None):
def get_email_by_id(self, id: str):
def remove_edge(self, name: str):

Remove an edge to a published pipeline.

Parameters
  • str name: The name of the edge that will be removed. This is not limited to this pipeline.
def mlops(self) -> wallaroo.wallaroo_ml_ops_api_client.client.AuthenticatedClient: