Apply MLOps best practices to advanced publishing options
MLOps is an essential practice for producing your Machine Learning workflows. With MLOps you can set up workflows that fit the ML lifecycle. These make it easy to centrally maintain resources, update/monitor models, and generally simplify the process as ML experimentation scales up.
A key tool of MLOps within the Amazon SageMaker the ecosystem is SageMaker Pipelines. With SageMaker Pipelines you can define workflows that are made up of different defined MLs Steps. You can also structure these workflows by defining parameters that you can inject as variables in your Pipeline. For a more general introduction to SageMaker Pipelines, see the linked article.
Defining a pipeline itself is not very complicated, but there are some advanced use cases that require additional configuration. Specifically, suppose you are training various models that are needed for inference in your ML use case. Within SageMaker there is a hosting option known as Multi-model endpoints (MME) where you can host multiple models on a singular endpoint and invoke a target model. However, within SageMaker Pipelines there is no native support for defining or implementing an MME natively at this time. In this blog post, we will see how we can use a Lambda Pass pipes to implement a multi-model endpoint in a custom manner, while adhering to MLOP best practices.
NOTE: For those of you who are new to AWS, be sure to create an account at the following link if you want to follow me The article also assumes an intermediate understanding of the SageMaker implementation, I would suggest following this article to understand Deployment/Inference more fully. In particular, for SageMaker multi-model terminals, I would refer to the following Blog.
Setting
For this example, we will work on SageMaker Studio, where we have access to the visual interfaces of SageMaker Pipelines and other SageMaker components. For development, we’ll use a Studio Notebook instance with a data science core on an ml.t3.medium instance. To get started, we must first import the libraries needed for the different steps that we will use within SageMaker Pipelines.
import os
import boto3
import re
import time
import json
from sagemaker import get_execution_role, session
import pandas as pdfrom time import gmtime, strftime
import sagemaker
from sagemaker.model import Model
from sagemaker.image_uris import retrieve
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.model_step import ModelStep
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.parameters import ParameterString
from sagemaker.estimator import Estimator
# Custom Lambda Step
from sagemaker.workflow.lambda_step import (
LambdaStep,
LambdaOutput,
LambdaOutputTypeEnum,
)
from sagemaker.lambda_helper import Lambda
from sagemaker.workflow.pipeline import Pipeline
Next we create a channeling sessionthis pipeline session ensures that none of the training jobs actually run inside the notebook until the pipeline itself runs.
pipeline_session = PipelineSession()
For this example we will use the Abalone data set (CC BY 4.0) and run a SageMaker XGBoost algorithm on it for a regression model. You can download the data set from Amazon’s publicly available data sets.
!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/uci_abalone/train_csv/abalone_dataset1_train.csv .
!aws s3 cp abalone_dataset1_train.csv s3://{default_bucket}/xgboost-regression/train.csv
training_path = 's3://{}/xgboost-regression/train.csv'.format(default_bucket)
then we can parameterize our Pipeline by defining default values for both the training dataset and the instance type.
training_input_param = ParameterString(
name = "training_input",
default_value=training_path,
)training_instance_param = ParameterString(
name = "training_instance",
default_value = "ml.c5.xlarge")
Then we also retrieve the Container provided by AWS for XGBoost that we will use for training and inference.
model_path = f's3://{default_bucket}/{s3_prefix}/xgb_model'image_uri = sagemaker.image_uris.retrieve(
framework="xgboost",
region=region,
version="1.0-1",
py_version="py3",
instance_type=training_instance_param,
)
image_uri
training settings
For the training part of our Pipeline, we will set up the SageMaker XGBoost algorithm for our Abalone regression dataset.
xgb_train_one = Estimator(
image_uri=image_uri,
instance_type=training_instance_param,
instance_count=1,
output_path=model_path,
sagemaker_session=pipeline_session,
role=role
)xgb_train_one.set_hyperparameters(
objective="reg:linear",
num_round=40,
max_depth=4,
eta=0.1,
gamma=3,
min_child_weight=5,
subsample=0.6,
silent=0,
)
For our second estimator, we change our hyperparameters to adjust our model training so that we have two separate models behind our multi-model endpoint.
xgb_train_two = Estimator(
image_uri=image_uri,
instance_type=training_instance_param,
instance_count=1,
output_path=model_path,
sagemaker_session=pipeline_session,
role=role
)#adjusting hyperparams
xgb_train_two.set_hyperparameters(
objective="reg:linear",
num_round=50,
max_depth=5,
eta=0.2,
gamma=4,
min_child_weight=6,
subsample=0.7,
silent=0,
)
We then set up our training inputs so that both estimators point towards the parameter we defined for our S3 training data set.
train_args_one = xgb_train_one.fit(
inputs={
"train": TrainingInput(
s3_data=training_input_param,
content_type="text/csv",
)
}
)train_args_two = xgb_train_two.fit(
inputs={
"train": TrainingInput(
s3_data=training_input_param,
content_type="text/csv",
)
}
)
We then define two separate training steps that will run in parallel through our Pipeline.
step_train_one = TrainingStep(
name="TrainOne",
step_args=train_args_one,
)step_train_two = TrainingStep(
name = "TrainTwo",
step_args= train_args_two
)
lambda step
TO lambda step it essentially allows you to plug in a Lambda function inside your Pipeline. For each SageMaker training job, a model.tar.gz is emitted containing the artifacts of the trained model. Here we will use the Lambda step to retrieve the artifacts from the trained model and deploy them to one endpoint of multiple SageMaker models.
Before we can do that, we need to give our Lambda function the proper permissions to work with SageMaker. We can use the following existing script to create an IAM role for our Lambda function.
import boto3
import jsoniam = boto3.client("iam")
def create_lambda_role(role_name):
try:
response = iam.create_role(
RoleName=role_name,
AssumeRolePolicyDocument=json.dumps(
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"Service": "lambda.amazonaws.com"},
"Action": "sts:AssumeRole",
}
],
}
),
Description="Role for Lambda to call SageMaker functions",
)
role_arn = response["Role"]["Arn"]
response = iam.attach_role_policy(
RoleName=role_name,
PolicyArn="arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole",
)
response = iam.attach_role_policy(
PolicyArn="arn:aws:iam::aws:policy/AmazonSageMakerFullAccess", RoleName=role_name
)
return role_arn
except iam.exceptions.EntityAlreadyExistsException:
print(f"Using ARN from existing role: {role_name}")
response = iam.get_role(RoleName=role_name)
return response["Role"]["Arn"]
from iam_helper import create_lambda_rolelambda_role = create_lambda_role("lambda-deployment-role")
Once we’ve defined our Lambda role, we can create a Lambda function that does a few things for us:
- It takes each individual model.tar.gz from each training job and places it in a central location in S3 that contains both tarballs. For MME expect all model tarballs to be in a single S3 path.
- Use the boto3 client with SageMaker to create a SageMaker model, endpoint configuration, and endpoint.
We can use the following helper functions to accomplish the first task, by copying the training job artifacts to a central location in S3 with both model tarballs.
sm_client = boto3.client("sagemaker")
s3 = boto3.resource('s3')def extract_bucket_key(model_data):
"""
Extracts the bucket and key from the model data tarballs that we are passing in
"""
bucket = model_data.split('/', 3)[2]
key = model_data.split('/', 3)[-1]
return [bucket, key]
def create_mme_dir(model_data_dir):
"""
Takes in a list of lists with the different trained models,
creates a central S3 bucket/key location with all model artifacts for MME.
"""
bucket_name = model_data_dir[0][0]
for i, model_data in enumerate(model_data_dir):
copy_source = {
'Bucket': bucket_name,
'Key': model_data[1]
}
bucket = s3.Bucket(bucket_name)
destination_key = 'xgboost-mme-pipelines/model-{}.tar.gz'.format(i)
bucket.copy(copy_source, destination_key)
mme_s3_path = 's3://{}/xgboost-mme-pipelines/'.format(bucket_name)
return mme_s3_path
The next steps for our Lambda function will be to create the necessary SageMaker entities to create a real-time endpoint:
- SageMaker model: Contains the model data and container image, also defines the multi-model vs. single-model endpoint.
- SageMaker Endpoint Configuration: Defines the hardware behind an endpoint, the instance type, and the count.
- SageMaker Endpoint: Your REST endpoint that you can call for inference, for MME it also specifies the model you want to inference against.
model_name = 'mme-source' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
create_model_response = sm_client.create_model(
ModelName=model_name,
Containers=[
{
"Image": image_uri,
"Mode": "MultiModel",
"ModelDataUrl": model_url
}
],
#to-do parameterize this
ExecutionRoleArn='arn:aws:iam::474422712127:role/sagemaker-role-BYOC',
)
print("Model Arn: " + create_model_response["ModelArn"])#Step 2: EPC Creation
xgboost_epc_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
endpoint_config_response = sm_client.create_endpoint_config(
EndpointConfigName=xgboost_epc_name,
ProductionVariants=[
{
"VariantName": "xgbvariant",
"ModelName": model_name,
"InstanceType": "ml.c5.large",
"InitialInstanceCount": 1
},
],
)
print("Endpoint Configuration Arn: " + endpoint_config_response["EndpointConfigArn"])
#Step 3: EP Creation
endpoint_name = "mme-source" + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
create_endpoint_response = sm_client.create_endpoint(
EndpointName=endpoint_name,
EndpointConfigName=xgboost_epc_name,
)
print("Endpoint Arn: " + create_endpoint_response["EndpointArn"])
We return a success message with our Lambda function once we can start creating an endpoint.
return {
"statusCode": 200,
"body": json.dumps("Created Endpoint!"),
"endpoint_name": endpoint_name
}
We then define this Lambda function in the necessary Lambda Step format for our Pipeline to detect.
# Lambda helper class can be used to create the Lambda function
func = Lambda(
function_name=function_name,
execution_role_arn=lambda_role,
script="code/lambda_helper.py",
handler="lambda_helper.lambda_handler",
)
We also define what we return from Lambda in the form of output parameters.
output_param_1 = LambdaOutput(output_name="statusCode", output_type=LambdaOutputTypeEnum.String)
output_param_2 = LambdaOutput(output_name="body", output_type=LambdaOutputTypeEnum.String)
output_param_3 = LambdaOutput(output_name="endpoint_name", output_type=LambdaOutputTypeEnum.String)
We then define our inputs with the two different trained model artifacts from the training steps we defined earlier in our notebook.
step_deploy_lambda = LambdaStep(
name="LambdaStep",
lambda_func=func,
inputs={
"model_artifacts_one": step_train_one.properties.ModelArtifacts.S3ModelArtifacts,
"model_artifacts_two": step_train_two.properties.ModelArtifacts.S3ModelArtifacts
},
outputs=[output_param_1, output_param_2, output_param_3],
)
Pipeline execution and sample inference
Now that we have our different steps set up, we can join all of this together into a singular pipeline. We noted our three different steps and the different parameters that we defined. Note that you can also define more parameters than we did here depending on your use case.
pipeline = Pipeline(
name="mme-pipeline",
steps=[step_train_one, step_train_two, step_deploy_lambda],
parameters= [training_input_param, training_instance_param]
)
Now we can run Pipeline with the following commands.
pipeline.upsert(role_arn=role)
execution = pipeline.start()
execution.wait()
After execution, we notice that in the Studio UI for the Pipelines tab, a Directed Acylic Graph (DAG) has been created for your Pipeline to show your workflow.
After a few minutes, you should also see that an endpoint has been created in the SageMaker Console.
We can then test this endpoint with a sample inference to make sure it works correctly.
import boto3
smr = boto3.client('sagemaker-runtime') #client for inference#specify the tarball you are invoking in the TargetModel param
resp = smr.invoke_endpoint(EndpointName=endpoint_name, Body=b'.345,0.224414,.131102,0.042329,.279923,-0.110329,-0.099358,0.0',
ContentType='text/csv', TargetModel = 'model-0.tar.gz')
print(resp['Body'].read())
Additional Resources and Conclusion
The code for the full example can be found at the link above (stay tuned for more Pipeline examples). This sample combines an advanced hosting option with MLOP best practices. Using MLOP tools as you expand your ML experimentation is crucial, as it helps simplify and parameterize your efforts to make it easier for teams to collaborate and track. I hope this article is a good overview of using Pipelines for a specific MME Hosting use case. As always, all feedback is appreciated, thanks for reading!