Speaker diarying, an essential process in audio analysis, segments an audio file based on the identity of the speaker. This post delves into Hugging Face's PyAnnote integration for speaker journaling with amazon SageMaker asynchronous endpoints.
We provide a complete guide on how to implement speaker segmentation and grouping solutions using SageMaker in the AWS Cloud. You can use this solution for applications that deal with audio recordings from multiple speakers (more than 100).
Solution Overview
amazon Transcribe is the go-to service for speaker registration on AWS. However, for unsupported languages, you can use other models (in our case, PyAnnote) that will be implemented in SageMaker to perform inference. For short audio files where inference takes up to 60 seconds, you can use real-time inference. For more than 60 seconds, asynchronous inference must be used. The additional benefit of asynchronous inference is cost savings by automatically scaling the instance count to zero when there are no requests to process.
hugging face is a popular open source hub for machine learning (ML) models. AWS and Hugging Face have a amazon-sagemaker-and-hugging-face” target=”_blank” rel=”noopener”>camaraderie enabling seamless integration through SageMaker with a set of AWS deep learning containers (DLCs) for training and inference in PyTorch or TensorFlow, and Hugging Face estimators and predictors for the SageMaker Python SDK. SageMaker's features and capabilities help developers and data scientists get started with natural language processing (NLP) on AWS with ease.
Integrating this solution involves using Hugging Face's pre-trained speaker registration model using the PyAnnote Library. PyAnnote is an open source toolset written in Python for speaker registration. This model, trained on the sample audio data set, allows for efficient partitioning of speakers into audio files. The model is implemented in SageMaker as an asynchronous endpoint configuration, providing efficient and scalable processing of logging tasks.
The following diagram illustrates the architecture of the solution.
For this post, we use the following audio file.
Stereo or multichannel audio files are automatically downmixed to mono by channel averaging. Audio files sampled at a different rate are automatically resampled to 16 kHz upon loading.
Previous requirements
Complete the following prerequisites:
Create a SageMaker domain.
Ensure that your AWS Identity and Access Management (IAM) user has the necessary access permissions to create a SageMaker role.
Make sure your AWS account has a service quota to host a SageMaker endpoint for an ml.g5.2xlarge instance.
Create a model function to access the PyAnnote speaker log from Hugging Face
You can use Hugging Face Hub to access any pre-trained items you want. PyAnnote Speaker Registration Template. Use the same script to download the model file when creating the SageMaker endpoint.
See the following code:
from PyAnnote.audio import Pipeline
def model_fn(model_dir):
# Load the model from the specified model directory
model = Pipeline.from_pretrained(
"PyAnnote/speaker-diarization-3.1",
use_auth_token="Replace-with-the-Hugging-face-auth-token")
return model
Package model code
Prepare essential files such as inference.py, which contains the inference code:
%%writefile model/code/inference.py
from PyAnnote.audio import Pipeline
import subprocess
import boto3
from urllib.parse import urlparse
import pandas as pd
from io import StringIO
import os
import torch
def model_fn(model_dir):
# Load the model from the specified model directory
model = Pipeline.from_pretrained(
"PyAnnote/speaker-diarization-3.1",
use_auth_token="hf_oBxxxxxxxxxxxx)
return model
def diarization_from_s3(model, s3_file, language=None):
s3 = boto3.client("s3")
o = urlparse(s3_file, allow_fragments=False)
bucket = o.netloc
key = o.path.lstrip("/")
s3.download_file(bucket, key, "tmp.wav")
result = model("tmp.wav")
data = {}
for turn, _, speaker in result.itertracks(yield_label=True):
data(turn) = (turn.start, turn.end, speaker)
data_df = pd.DataFrame(data.values(), columns=("start", "end", "speaker"))
print(data_df.shape)
result = data_df.to_json(orient="split")
return result
def predict_fn(data, model):
s3_file = data.pop("s3_file")
language = data.pop("language", None)
result = diarization_from_s3(model, s3_file, language)
return {
"diarization_from_s3": result
}
Prepare a requirements.txt file, which contains the Python libraries needed to run the inference:
with open("model/code/requirements.txt", "w") as f:
f.write("transformers==4.25.1\n")
f.write("boto3\n")
f.write("PyAnnote.audio\n")
f.write("soundfile\n")
f.write("librosa\n")
f.write("onnxruntime\n")
f.write("wget\n")
f.write("pandas")
Finally, compress the inference.py and requirements.txt and save it as model.tar.gz:
Set up a SageMaker model
Define a SageMaker model resource by specifying the image URI, the location of the model data in amazon Simple Storage Service (S3), and the SageMaker role:
import sagemaker
import boto3
sess = sagemaker.Session()
sagemaker_session_bucket = None
if sagemaker_session_bucket is None and sess is not None:
sagemaker_session_bucket = sess.default_bucket()
try:
role = sagemaker.get_execution_role()
except ValueError:
iam = boto3.client("iam")
role = iam.get_role(RoleName="sagemaker_execution_role")("Role")("Arn")
sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)
print(f"sagemaker role arn: {role}")
print(f"sagemaker bucket: {sess.default_bucket()}")
print(f"sagemaker session region: {sess.boto_region_name}")
Set up an asynchronous endpoint to deploy the model to SageMaker using the provided asynchronous inference configuration:
from sagemaker.huggingface.model import HuggingFaceModel
from sagemaker.async_inference.async_inference_config import AsyncInferenceConfig
from sagemaker.s3 import s3_path_join
from sagemaker.utils import name_from_base
async_endpoint_name = name_from_base("custom-asyc")
# create Hugging Face Model Class
huggingface_model = HuggingFaceModel(
model_data=s3_location, # path to your model and script
role=role, # iam role with permissions to create an Endpoint
transformers_version="4.17", # transformers version used
pytorch_version="1.10", # pytorch version used
py_version="py38", # python version used
)
# create async endpoint configuration
async_config = AsyncInferenceConfig(
output_path=s3_path_join(
"s3://", sagemaker_session_bucket, "async_inference/output"
), # Where our results will be stored
# Add nofitication SNS if needed
notification_config={
# "SuccessTopic": "PUT YOUR SUCCESS SNS TOPIC ARN",
# "ErrorTopic": "PUT YOUR ERROR SNS TOPIC ARN",
}, # Notification configuration
)
env = {"MODEL_SERVER_WORKERS": "2"}
# deploy the endpoint endpoint
async_predictor = huggingface_model.deploy(
initial_instance_count=1,
instance_type="ml.xx",
async_inference_config=async_config,
endpoint_name=async_endpoint_name,
env=env,
)
Test the endpoint
Evaluate the functionality of the endpoint by sending an audio file for recording and retrieving the JSON output stored in the specified S3 output path:
# Replace with a path to audio object in S3
from sagemaker.async_inference import WaiterConfig
res = async_predictor.predict_async(data=data)
print(f"Response output path: {res.output_path}")
print("Start Polling to get response:")
config = WaiterConfig(
max_attempts=10, # number of attempts
delay=10# time in seconds to wait between attempts
)
res.get_result(config)
#import waiterconfig
To implement this solution at scale, we suggest using AWS Lambda, amazon Simple Notification Service (amazon SNS), or amazon Simple Queue Service (amazon SQS). These services are designed to provide scalability, event-driven architectures, and efficient resource utilization. They can help decouple the asynchronous inference process from result processing, allowing you to scale each component independently and handle bursts of inference requests more effectively.
Results
The model output is stored in s3://sagemaker-xxxx /async_inference/output/. The result shows that the audio recording has been segmented into three columns:
Start (start time in seconds)
End (end time in seconds)
Speaker (speaker label)
The following code shows an example of our results:
You can set a scaling policy to zero by setting MinCapacity to 0; Asynchronous inference allows you to automatically scale to zero without requests. No need to delete the endpoint, it scales from scratch when needed again, reducing costs when not in use. See the following code:
# Common class representing application autoscaling for SageMaker
client = boto3.client('application-autoscaling')
# This is the format in which application autoscaling references the endpoint
resource_id='endpoint/' + <endpoint_name> + '/variant/' + <'variant1'>
# Define and register your endpoint variant
response = client.register_scalable_target(
ServiceNamespace="sagemaker",
ResourceId=resource_id,
ScalableDimension='sagemaker:variant:DesiredInstanceCount', # The number of EC2 instances for your amazon SageMaker model endpoint variant.
MinCapacity=0,
MaxCapacity=5
)
If you want to remove the endpoint, use the following code:
The solution can efficiently handle multiple or large audio files.
This example uses a single instance for demonstration. If you want to use this solution for hundreds or thousands of videos and use an asynchronous endpoint to process across multiple instances, you can use an auto-scaling policy, which is designed for a large number of source documents. Autoscaling dynamically adjusts the number of instances provisioned for a model in response to changes in its workload.
The solution optimizes resources and reduces system load by separating long-running tasks from real-time inference.
Conclusion
In this post, we provide a simple approach to implement the Hugging Face speaker registration model in SageMaker using Python scripts. Using an asynchronous endpoint provides an efficient and scalable means of delivering diarization predictions as a service, servicing concurrent requests seamlessly.
Get started today with asynchronous speaker registration for your audio projects. Please reach out in the comments if you have any questions about getting your own asynchronous logging endpoint up and running.
About the authors
Sanjay Tiwary is an ai/ML solutions architect who spends his time working with strategic clients to define business requirements, deliver L300 sessions around specific use cases, and design ai/ML applications and services that are scalable, reliable, and efficient. He has helped launch and scale the ai/ML-powered amazon SageMaker service and has implemented several proofs of concept using amazonai services. He has also developed the advanced analytics platform as part of the digital transformation journey.
Kiran Challapalli is a deep tech business developer in the public sector at AWS. He has over 8 years of ai/ML experience and 23 years of overall software development and sales experience. Kiran helps public sector companies across India explore and co-create cloud-based solutions using ai, machine learning and generative ai technologies (including large language models).