OpenAI Inference Endpoint Deployment and Monitoring with Wallaroo
This tutorial and the assets can be downloaded as part of the Wallaroo Tutorials repository.
Wallaroo Deployment of Managed Inference Endpoint Models with OpenAI
The following tutorial demonstrates uploading, deploying, inferencing and monitoring a LLM with Managed Inference Endpoints.
These models leverage LLMs deployed in other services, with Wallaroo providing a single source for inference requests, logging results, monitoring for hate/abuse/racism and other factors, and tracking model drift through Wallaroo assays.
Provided Models
The following models are provided:
gpt35.zip
: A Wallaroo BYOP model that uses OpenAI as a Managed Inference Endpoint.summarization_quality.zip
: A Wallaroo BYOP model that scores the quality of the LLM response.
OpenAI Python Library Requirements
Wallaroo Custom Models (aka BYOP) artifacts include an optional requirements.txt
file to set the Python libraries used when deploying the model. For Wallaroo 2024.4, the following are required for LLMs with Managed Inference Endpoints with OpenAI contained within the BYOP framework.
Library | Required Version |
---|---|
httpx | httpx==0.27.2 |
Prerequisites
This tutorial requires:
- Wallaroo 2024.1 and above
- Credentials for authenticating to OpenAI
Tutorial Steps
Import Library
The following libraries are used to upload and perform inferences on the LLM with Managed Inference Endpoints.
import wallaroo
from wallaroo.framework import Framework
import pyarrow as pa
import polars as pl
Connect to the Wallaroo Instance
A connection to Wallaroo is opened through the Wallaroo SDK client. The Python library is included in the Wallaroo install and available through the Jupyter Hub interface provided with your Wallaroo environment.
This is accomplished using the wallaroo.Client()
command, which provides a URL to grant the SDK permission to your specific Wallaroo environment. When displayed, enter the URL into a browser and confirm permissions. Store the connection into a variable that can be referenced later.
If logging into the Wallaroo instance through the internal JupyterHub service, use wl = wallaroo.Client()
. For more information on Wallaroo Client settings, see the Client Connection guide.
The request_timeout
flag is used for Wallaroo BYOP models where the file size may require additional time to complete the upload process.
wl = wallaroo.Client(request_timeout=480)
Set Workspace
The following creates or connects to an existing workspace based on the variable workspace_name
, and sets it as the current workspace. For more details on Wallaroo workspaces, see Wallaroo Workspace Management Guide.
For this tutorial, the workspace name openai
is used.
workspace = wl.get_workspace('openai', create_if_not_exist=True)
_ = wl.set_current_workspace(workspace)
LLM with Managed Inference Endpoint Model Code
The Wallaroo BYOP model gpt35.zip
contains the following artifacts:
main.py
: Python script that controls the behavior of the model.requirements.txt
: Python requirements file that sets the Python libraries used.secret_key.json
: Secret key that contains the OpenAI API key.
The model performs the following.
Accepts the inference request from the requester.
Connect to OpenAI using the OpenAI API Key.
def create(self, config: CustomInferenceConfig) -> GPTInference: inference = self.inference with open(os.path.join(config.model_path, 'secret_key.json')) as file: auth = json.load(file) inference.model = OpenAI(api_key=auth['API_SECRET']) return inference
Take the inference request, connect to OpenAI and submit the request. The inference result is returned to the BYOP model, which is then returned.
def _predict(self, input_data: InferenceData): generated_texts = [] prompts = input_data["text"].tolist() for prompt in prompts: result = self.model.chat.completions.create( model="gpt-3.5-turbo-1106", messages=[ {"role": "user", "content": prompt} ], temperature=1, max_tokens=256, top_p=1, frequency_penalty=0, presence_penalty=0 ) generated_texts.append(result.choices[0].message.content) prompt = np.array([str(x) for x in input_data["text"]]) return {"text": prompt, "generated_text": np.array(generated_texts)}
This model is contained in a Wallaroo pipeline which accepts the inference request, and returns the final result back to the requester.
Upload LLM with Managed Inference Endpoint Model
Uploading models uses the Wallaroo Client upload_model
method, which takes the following parameters:
Parameter | Type | Description |
---|---|---|
name | string (Required) | The name of the model. Model names are unique per workspace. Models that are uploaded with the same name are assigned as a new version of the model. |
path | string (Required) | The path to the model file being uploaded. |
framework | string (Required) | The framework of the model from wallaroo.framework . |
input_schema | pyarrow.lib.Schema (Required) | The input schema in Apache Arrow schema format. |
output_schema | pyarrow.lib.Schema (Required) | The output schema in Apache Arrow schema format. |
The following shows the upload parameters for the gpt35.zip
Wallaroo BYOP model with the following input and output schema:
- Input:
text
(String): The input text.
- Output:
text
(String): The original input text.generated_text
(String): The result returned from the GPT 3.5 model as a Managed Inference Endpoint.
input_schema = pa.schema([
pa.field('text', pa.string()),
])
output_schema = pa.schema([
pa.field('text', pa.string()),
pa.field('generated_text', pa.string()),
])
gpt = wl.upload_model('gpt-35',
r'models/gpt35.zip',
framework=Framework.CUSTOM,
input_schema=input_schema,
output_schema=output_schema,
)
Waiting for model loading - this will take up to 10.0min.
Model is pending loading to a container runtime..
Model is attempting loading to a container runtime..............successful
Ready
Set Deployment Configuration
The deployment configuration sets the resources assigned to the LLM and the LLM Validation Listener model. For this example, following resources are applied.
gpt35.zip
: 0.5 cpus, 1 Gi RAM.
deployment_config = wallaroo.DeploymentConfigBuilder() \
.cpus(1) \
.memory('1Gi') \
.sidekick_cpus(gpt, 0.5) \
.sidekick_memory(gpt, '1Gi') \
.build()
Deploy Model
To deploy the model:
- We build a Wallaroo pipeline and assign the model as a pipeline step. For this tutorial it is called
gpt35-wrapper-pipeline
. - The pipeline is deployed with the deployment configuration.
- Once the resources allocation is complete, the model is ready for inferencing.
See Model Deploy for more details on deploying LLMs in Wallaroo.
gpt_pipeline = wl.build_pipeline("gpt35-wrapper-pipeline") \
.add_model_step(gpt) \
.deploy(deployment_config=deployment_config)
Waiting for deployment - this will take up to 480s ..................................... ok
Generate Inference Request
The inference request will be submitted as a pandas DataFrame as a text entry.
import pandas as pd
text = '''Please summarize this text in one sentence:
Simplify production AI for seamless self-checkout or cashierless experiences at scale, enabling any retail store to offer a modern shopping journey.
We reduce the technical overhead and complexity for delivering a checkout experience that’s easy and efficient no matter where your stores are located.
Eliminate Checkout Delays: Easy and fast model deployment for a smooth self-checkout process, allowing customers to enjoy faster, hassle-free shopping experiences.
Drive Operational Efficiencies: Simplifying the process of scaling AI-driven self-checkout solutions to multiple retail locations ensuring uniform customer experiences no matter the location of the store while reducing in-store labor costs.
Continuous Improvement: Enabling integrated data insights for informing self-checkout improvements across various locations, ensuring the best customer experience, regardless of where they shop.
'''
data = pd.DataFrame({"text": [text]})
Submit Inference Request
The inference request is submitted to the pipeline with the infer
method, which accepts either:
- pandas DataFrame
- Apache Arrow Table
The results are returned in the same format as submitted. For this example, a pandas DataFrame is submitted, so a pandas DataFrame is returned. The final generated text is displayed.
result = gpt_pipeline.infer(data)
result['out.generated_text'].iloc[0]
'This text outlines the benefits of simplifying production AI for self-checkout experiences at retail stores, including reducing technical overhead, eliminating checkout delays, driving operational efficiencies, and enabling continuous improvement for a seamless and efficient shopping journey.'
LLM Listener with OpenAI Managed Inference Endpoint LLM
The results of the BYOP with Managed Inference Endpoint are scored with a in-line LLM Validation Listener or an offline LLM Monitoring Listener. The following demonstrates using a LLM Listener to evaluate the outputs of the BYOP with Managed Inference Endpoint and score it.
This demonstration uses the model summarization_quality.zip
model, a BYOP model which takes the generated text and returns a scored output.
Upload Summarization LLM Listener
To upload the Summarization LLM Listener, we set the input and output schema as follows.
- Inputs
text
(String): The original inference requestgenerated_text
(String): The text returned from the Managed Inference Endpoint.
- Outputs
generated_text
(String): The text returned from the Managed Inference Endpoint.score
(Float64): The total score based on the generated_text field.
input_schema = pa.schema([
pa.field('text', pa.string()),
pa.field('generated_text', pa.string())
])
output_schema = pa.schema([
pa.field('generated_text', pa.string()),
pa.field('score', pa.list_(pa.float64())),
])
summarizer_listener = wl.upload_model('summarizer_listener',
r'models/summarization_quality.zip',
framework=Framework.CUSTOM,
input_schema=input_schema,
output_schema=output_schema
)
Deploy Managed Inference Endpoint LLM with Summarizer Score
To deploy the Managed Inference Endpoint with the Summarizer Score model:
- Set the deployment configuration. For this example, we allocate the following resources per model:
gpt
: 0.5 cpus, 1 Gi RAM.summarizer_listener
: 2 cpus, 8 Gi RAM
deployment_config = wallaroo.DeploymentConfigBuilder() \
.cpus(1) \
.memory('1Gi') \
.sidekick_cpus(gpt, 0.5) \
.sidekick_memory(gpt, '1Gi') \
.sidekick_cpus(summarizer_listener, 2) \
.sidekick_memory(summarizer_listener, '8Gi') \
.build()
We then build the pipeline add both the GPT model and the summarizer listener to the pipeline. For additional context, we’ll add a Anomaly Detection that will detect any scores that are less than 0.75
.
listener_pipeline = wl.build_pipeline('summarizer-listener') \
.add_model_step(gpt) \
.add_model_step(summarizer_listener) \
.add_validations(incorrect_summary = pl.col('out.score').list.get(0) < 0.75) \
.deploy(deployment_config=deployment_config)
Waiting for deployment - this will take up to 480s ..................................................................................................................................................................... ok
Generate Inference and Score Text
We now perform the same inference as before, this time with an added LLM Listener to provide the score based on the summarized text.
text = '''Please summarize this text in 5 words:
Simplify production AI for seamless self-checkout or cashierless experiences at scale, enabling any retail store to offer a modern shopping journey.
We reduce the technical overhead and complexity for delivering a checkout experience that’s easy and efficient no matter where your stores are located.
Eliminate Checkout Delays: Easy and fast model deployment for a smooth self-checkout process, allowing customers to enjoy faster, hassle-free shopping experiences.
Drive Operational Efficiencies: Simplifying the process of scaling AI-driven self-checkout solutions to multiple retail locations ensuring uniform customer experiences no matter the location of the store while reducing in-store labor costs.
Continuous Improvement: Enabling integrated data insights for informing self-checkout improvements across various locations, ensuring the best customer experience, regardless of where they shop.
'''
data = pd.DataFrame({"text": [text]})
result = listener_pipeline.infer(data, timeout=10000)
result
time | in.text | out.generated_text | out.score | anomaly.count | anomaly.incorrect_summary | |
---|---|---|---|---|---|---|
0 | 2024-07-16 18:52:35.226 | Please summarize this text in 5 words: \n\nSim... | AI simplifies self-checkout for retailers. | [0.7120675] | 1 | True |
Undeploy the Model
With the tutorial complete, we undeploy the model and return the resources back to the cluster.
listener_pipeline.undeploy()