Retail: Data Connections

How to use Wallaroo Connections to store external data configurations

Statsmodel Forecast with Wallaroo Features: Data Connection

Wallaroo Connections are definitions set by MLOps engineers that are used by other Wallaroo users for connection information to a data source.

This provides MLOps engineers a method of creating and updating connection information for data stores: databases, Kafka topics, etc. Wallaroo Connections are composed of three main parts:

  • Name: The unique name of the connection.
  • Type: A user defined string that designates the type of connection. This is used to organize connections.
  • Details: Details are a JSON object containing the information needed to make the connection. This can include data sources, authentication tokens, etc.

Wallaroo Connections are only used to store the connection information used by other processes to create and use external connections. The user still has to provide the libraries and other elements to actually make and use the conneciton.

The primary advantage is Wallaroo connections allow scripts and other code to retrieve the connection details directly from their Wallaroo instance, then refer to those connection details. They don’t need to know what those details actually - they can refer to them in their code to make their code more flexible.

For this step, we will use a Google BigQuery dataset to retrieve the inference information, predict the next month of sales, then store those predictions into another table. This will use the Wallaroo Connection feature to create a Connection, assign it to our workspace, then perform our inferences by using the Connection details to connect to the BigQuery dataset and tables.

Prerequisites

  • A Wallaroo instance version 2023.2.1 or greater.

References

Statsmodel Forecast Connection Steps

Import Libraries

The first step is to import the libraries that we will need.

import json
import os
import datetime

import wallaroo
from wallaroo.object import EntityNotFoundError
from wallaroo.framework import Framework

# used to display dataframe information without truncating
from IPython.display import display
import pandas as pd
import numpy as np

pd.set_option('display.max_colwidth', None)

import time
import pyarrow as pa
## convenience functions from the previous notebooks
## these functions assume your connection to wallaroo is called wl

# return the workspace called <name>, or create it if it does not exist.
# this function assumes your connection to wallaroo is called wl
def get_workspace(name):
    workspace = None
    for ws in wl.list_workspaces():
        if ws.name() == name:
            workspace= ws
    if(workspace == None):
        workspace = wl.create_workspace(name)
    return workspace

# pull a single datum from a data frame 
# and convert it to the format the model expects
def get_singleton(df, i):
    singleton = df.iloc[i,:].to_numpy().tolist()
    sdict = {'tensor': [singleton]}
    return pd.DataFrame.from_dict(sdict)

# pull a batch of data from a data frame
# and convert to the format the model expects
def get_batch(df, first=0, nrows=1):
    last = first + nrows
    batch = df.iloc[first:last, :].to_numpy().tolist()
    return pd.DataFrame.from_dict({'tensor': batch})

# Translated a column from a dataframe into a single array
# used for the Statsmodel forecast model

def get_singleton_forecast(df, field):
    singleton = pd.DataFrame({field: [df[field].values.tolist()]})
    return singleton

# Get the most recent version of a model in the workspace
# Assumes that the most recent version is the first in the list of versions.
# wl.get_current_workspace().models() returns a list of models in the current workspace

def get_model(mname):
    modellist = wl.get_current_workspace().models()
    model = [m.versions()[-1] for m in modellist if m.name() == mname]
    if len(model) <= 0:
        raise KeyError(f"model {mname} not found in this workspace")
    return model[0]

# get a pipeline by name in the workspace
def get_pipeline(pname):
    plist = wl.get_current_workspace().pipelines()
    pipeline = [p for p in plist if p.name() == pname]
    if len(pipeline) <= 0:
        raise KeyError(f"pipeline {pname} not found in this workspace")
    return pipeline[0]

Connect to the Wallaroo Instance

The first step is to connect to Wallaroo through the Wallaroo client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.

This is accomplished using the wallaroo.Client() command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.

If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client(). For more information on Wallaroo Client settings, see the Client Connection guide.

# Login through local Wallaroo instance

wl = wallaroo.Client()

Set Configurations

The following will set the workspace, model name, and pipeline that will be used for this example. If the workspace or pipeline already exist, then they will assigned for use in this example. If they do not exist, they will be created based on the names listed below.

Workspace names must be unique. To allow this tutorial to run in the same Wallaroo instance for multiple users, set the suffix variable or share the workspace with other users.

Set Configurations References

# retrieve the workspace, pipeline and model

import string
import random

suffix= ''.join(random.choice(string.ascii_lowercase) for i in range(4))

workspace_name = f'forecast-model-tutorial'

workspace = get_workspace(workspace_name)

wl.set_current_workspace(workspace)

control_model_name = 'forecast-control-model'

bike_day_model = get_model(control_model_name)

pipeline_name = 'forecast-tutorial-pipeline'

pipeline = get_pipeline(pipeline_name)

Deploy the Pipeline

Let’s set the model step to our single model pipeline, and perform a sample inference with our current data.

# Set pipeline step and deploy

pipeline.undeploy()
pipeline.clear()
pipeline.add_model_step(bike_day_model)
pipeline.deploy()
pipeline.steps()
[{'ModelInference': {'models': [{'name': 'forecast-control-model', 'version': 'd9af417f-29c3-49b1-9cad-a930779825d2', 'sha': '98b5f0911f608fdf9052b1b6db95c89a2c77c4b10d8f64a6d27df846ac616eb1'}]}}]
# sample inference from previous code here

sample_count = pd.read_csv('../data/test_data.csv')
inference_df = get_singleton_forecast(sample_count.loc[2:22], 'count')

results = pipeline.infer(inference_df)
display(results)
timein.countout.forecastout.weekly_averagecheck_failures
02023-08-03 15:33:47.771[1349, 1562, 1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543, 981, 986][1278, 1295, 1295, 1295, 1295, 1295, 1295][1292.5714285714287]0

Create the Connection

For this demonstration, the connection set to a specific file on a GitHub repository. The connection details can be anything that can be stored in JSON: connection URLs, tokens, etc.

This connection will set a URL to pull a file from GitHub, then use the file contents to perform an inference.

Wallaroo connections are created through the Wallaroo Client create_connection(name, type, details) method. See the Wallaroo SDK Essentials Guide: Data Connections Management guide for full details.

Note that connection names must be unique across the Wallaroo instance - if needed, use random characters at the end to make sure your connection doesn’t have the same name as a previously created connection.

# set the connection information for other steps
# suffix is used to create a unique data connection

forecast_connection_input_name = f'bike-rentals-csv'
forecast_connection_input_type = "HTTP"
forecast_connection_input_argument = { 
    "url": "https://raw.githubusercontent.com/WallarooLabs/Tutorials/jch-forecast-tutorials/Forecasting/Retail-CPG/data/test_data.csv"
    }

wl.create_connection(forecast_connection_input_name, forecast_connection_input_type, forecast_connection_input_argument)
FieldValue
Namebike-rentals-csv-john
Connection TypeHTTP
Details*****
Created At2023-08-03T15:40:28.804640+00:00
Linked Workspaces[]

List Connections

Connections for the entire Wallaroo instance are listed with Wallaroo Client list_connections() method.

# list the connections here

wl.list_connections()
nameconnection typedetailscreated atlinked workspaces
statsmodel-bike-rentals-johnHTTP*****2023-08-02T20:38:34.662841+00:00['forecast-model-tutorialjohn']
bike-rentals-csv-johnHTTP*****2023-08-03T15:40:28.804640+00:00[]

Get Connection by Name

To retrieve a previosly created conneciton, we can assign it to a variable with the method Wallaroo Client.get_connection(connection_name). Then we can display the connection itself. Notice that when displaying a connection, the details section will be hidden, but they are retrieved with connection.details(). Here’s an example:

myconnection = client.get_connection("My amazing connection")
display(myconnection)
display(myconnection.details()

Use that code to retrieve your new connection.

# get the connection by name

connection = wl.get_connection(forecast_connection_input_name)
connection
FieldValue
Namebike-rentals-csv-john
Connection TypeHTTP
Details*****
Created At2023-08-03T15:40:28.804640+00:00
Linked Workspaces[]

Add Connection to Workspace

We’ll now add the connection to our workspace so it can be retrieved by other workspace users. The method Workspace add_connection(connection_name) adds a Data Connection to a workspace. The method Workspace list_connections() displays a list of connections attached to the workspace.

workspace.add_connection(forecast_connection_input_name)
workspace.list_connections()
nameconnection typedetailscreated atlinked workspaces
statsmodel-bike-rentals-johnHTTP*****2023-08-02T20:38:34.662841+00:00['forecast-model-tutorialjohn']
bike-rentals-csv-johnHTTP*****2023-08-03T15:40:28.804640+00:00['forecast-model-tutorialjohn']

Retrieve Connection from Workspace

To simulate a data scientist’s procedural flow, we’ll now retrieve the connection from the workspace. Specific connections are retrieved by specifying their position in the returned list.

For example, if we have two connections in a workspace and we want the second one, we can assign it to a variable with list_connections[1].

Create a new variable and retrieve the connection we just assigned to the workspace.

forecast_connection = workspace.list_connections()[-1]
display(forecast_connection)
FieldValue
Namebike-rentals-csv-john
Connection TypeHTTP
Details*****
Created At2023-08-03T15:40:28.804640+00:00
Linked Workspaces['forecast-model-tutorialjohn']

Run Inference with Connection

We’ll now retrieve sample data through the Wallaroo connection, and perform a sample inference. The connection details are retrieved through the Connection details() method. Use them to retrieve the CSV file and convert it to a DataFrame, and use it with our sample model.

Or create a new connection with your own data. Here’s some sample code for retrieving a CSV file from a URL.

response = requests.get('https://myurl.com/csvsample.csv')

csv_text = response.text
csv_new = csv_text.replace('\\n', '\n')

from io import StringIO
csvStringIO = StringIO(csv_new)
df = pd.read_csv(csvStringIO)
display(forecast_connection.details()['url'])

import requests

response = requests.get(
                    forecast_connection.details()['url']
                )

csv_text = response.text
csv_new = csv_text.replace('\\n', '\n')

from io import StringIO
csvStringIO = StringIO(csv_new)

# print(csv_new)

sample_count = pd.read_csv(csvStringIO)
inference_df = get_singleton_forecast(sample_count.loc[2:22], 'count')

results = pipeline.infer(inference_df)
display(results)
'https://raw.githubusercontent.com/WallarooLabs/Tutorials/jch-forecast-tutorials/Forecasting/Retail-CPG/data/test_data.csv'
timein.countout.forecastout.weekly_averagecheck_failures
02023-08-03 15:56:59.299[1349, 1562, 1600, 1606, 1510, 959, 822, 1321, 1263, 1162, 1406, 1421, 1248, 1204, 1000, 683, 1650, 1927, 1543, 981, 986][1278, 1295, 1295, 1295, 1295, 1295, 1295][1292.5714285714287]0

You have now walked through setting up a basic assay and running it over historical data.

Congratulations!

In this tutorial you have

  • Deployed a single step house price prediction pipeline and sent data to it.
  • Create a new Wallaroo connection
  • Assigned the connection to a workspace
  • Retrieved the connection from the workspace
  • Used the data connection to retrieve information from outside of Wallaroo, and use it for an inference

Great job!

Cleaning up.

Now that the tutorial is complete, don’t forget to undeploy your pipeline to free up the resources.

pipeline.undeploy()
nameforecast-tutorial-pipeline
created2023-08-02 15:50:59.480547+00:00
last_updated2023-08-02 20:22:05.264816+00:00
deployedFalse
tags
versionsa109a040-c8f2-46dc-8c0b-373ae10d4fa0, dcaec327-1358-42a7-88de-931602a42a72, debc509f-9481-464b-af7f-5c3138a9cdb4, b0d167aa-cc98-440a-8e85-1ae3f089745a, d9e69c40-c83b-48af-b6b9-caafcb85f08b, 186ffdd2-3a8f-40cc-8362-13cc20bd2f46, 535e6030-ebe5-4c79-b5cd-69b161637a99, c5c0218a-800b-4235-8767-64d18208e68a, 4559d934-33b0-4872-a788-4ef27f554482, 94d3e20b-add7-491c-aedd-4eb094a8aebf, ab4e58bf-3b75-4bf6-b6b3-f703fe61e7af, 3773f5c5-e4c5-4e46-a839-6945af15ca13, 3abf03dd-8eab-4a8d-8432-aa85a30c0eda, 5ec5e8dc-7492-498b-9652-b3733e4c87f7, 1d89287b-4eff-47ec-a7bb-8cedaac1f33f
stepsforecast-control-model