amazon SageMaker Pipelines incluye características que le permiten optimizar y automatizar los flujos de trabajo de aprendizaje automático (ML). Esto permite a los científicos y desarrolladores de modelos centrarse en el desarrollo de modelos y la experimentación rápida en lugar de en la gestión de infraestructura.
Pipelines ofrece la capacidad de orquestar flujos de trabajo de aprendizaje automático complejos con un SDK de Python simple con la capacidad de visualizar esos flujos de trabajo a través de SageMaker Studio. Esto ayuda con la preparación de datos y las tareas de ingeniería de funciones, así como con la capacitación de modelos y la automatización de la implementación. Pipelines también se integra con el ajuste automático de modelos de amazon SageMaker, que puede encontrar automáticamente los valores de hiperparámetros que dan como resultado el modelo de mejor rendimiento, según lo determine la métrica elegida.
Los modelos conjuntos se están volviendo populares dentro de las comunidades de ML. Generan predicciones más precisas combinando las predicciones de múltiples modelos. Las canalizaciones se pueden utilizar rápidamente para crear una canalización de aprendizaje automático de un extremo a otro para modelos de conjunto. Esto permite a los desarrolladores crear modelos de alta precisión manteniendo la eficiencia y la reproducibilidad.
En esta publicación, proporcionamos un ejemplo de un modelo de conjunto que se entrenó e implementó mediante Pipelines.
Descripción general del caso de uso
Los representantes de ventas generan nuevos clientes potenciales y crean oportunidades dentro de Salesforce para realizar un seguimiento de ellos. La siguiente aplicación es un enfoque de aprendizaje automático que utiliza el aprendizaje no supervisado para identificar automáticamente casos de uso en cada oportunidad en función de diversa información de texto, como nombre, descripción, detalles y grupo de servicio del producto.
El análisis preliminar mostró que los casos de uso varían según la industria y los diferentes casos de uso tienen una distribución muy diferente de los ingresos anualizados y pueden ayudar con la segmentación. Por lo tanto, un caso de uso es una característica predictiva importante que puede optimizar los análisis y mejorar los modelos de recomendación de ventas.
Podemos tratar la identificación de casos de uso como un problema de identificación de temas y exploramos diferentes modelos de identificación de temas, como el análisis semántico latente (LSA), la asignación latente de Dirichlet (LDA) y BERTopic. Tanto en LSA como en LDA, cada documento se trata solo como una colección de palabras y el orden de las palabras o la función gramatical no importa, lo que puede causar cierta pérdida de información al determinar el tema. Además, requieren una cantidad predeterminada de temas, lo que fue difícil de determinar en nuestro conjunto de datos. Dado que BERTopic superó el problema anterior, se utilizó para identificar el caso de uso.
El enfoque utiliza tres modelos BERTopic secuenciales para generar el agrupamiento final en un método jerárquico.
Cada modelo BERTopic consta de cuatro partes:
- incrustar – Se pueden utilizar diferentes métodos de incrustación en BERTopic. En este escenario, los datos de entrada provienen de varias áreas y generalmente se ingresan manualmente. Como resultado, utilizamos la incrustación de oraciones para garantizar la escalabilidad y el procesamiento rápido.
- Reducción de dimensiones – Utilizamos Proyección y aproximación de colector uniforme (UMAP), que es un método de reducción de dimensiones no lineal y no supervisado, para reducir vectores de texto de alta dimensión.
- Agrupación – Utilizamos el método de agrupación y reducción iterativa equilibrada mediante jerarquías (BIRCH) para formar diferentes grupos de casos de uso.
- Identificación de palabras clave – Utilizamos TF-IDF basado en clases para extraer las palabras más representativas de cada grupo.
Modelo de conjunto secuencial
No hay un número predeterminado de temas, por lo que establecemos una entrada para el número de grupos entre 15 y 25 temas. Tras la observación, algunos de los temas son amplios y generales. Por lo tanto, se les aplica individualmente otra capa del modelo BERTopic. Después de combinar todos los temas recién identificados en el modelo de segunda capa y junto con los temas originales de los resultados de la primera capa, el posprocesamiento se realiza manualmente para finalizar la identificación del tema. Por último, se utiliza una tercera capa en algunos de los grupos para crear subtemas.
Para permitir que los modelos de segunda y tercera capa funcionen eficazmente, necesita un archivo de mapeo para asignar los resultados de los modelos anteriores a palabras o frases específicas. Esto ayuda a garantizar que la agrupación sea precisa y relevante.
Estamos utilizando la optimización bayesiana para el ajuste de hiperparámetros y la validación cruzada para reducir el sobreajuste. El conjunto de datos contiene características como nombre de la oportunidad, detalles de la oportunidad, necesidades, nombre del producto asociado, detalles del producto y grupos de productos. Los modelos se evalúan utilizando una función de pérdida personalizada y se selecciona el mejor modelo de incorporación.
Desafíos y consideraciones
Estos son algunos de los desafíos y consideraciones de esta solución:
- La capacidad de preprocesamiento de datos del canal es crucial para mejorar el rendimiento del modelo. Con la capacidad de preprocesar los datos entrantes antes del entrenamiento, podemos asegurarnos de que nuestros modelos reciban datos de alta calidad. Algunos de los pasos de preprocesamiento y limpieza de datos incluyen convertir todas las columnas de texto a minúsculas, eliminar elementos de plantilla, contracciones, URL, correos electrónicos, etc., eliminar etiquetas NER no relevantes y lematizar el texto combinado. El resultado son predicciones más precisas y fiables.
- Necesitamos un entorno informático que sea altamente escalable para que podamos manejar y entrenar sin esfuerzo millones de filas de datos. Esto nos permite realizar tareas de modelado y procesamiento de datos a gran escala con facilidad y reduce el tiempo y los costos de desarrollo.
- Debido a que cada paso del flujo de trabajo de ML requiere requisitos de recursos variables, una canalización flexible y adaptable es esencial para una asignación eficiente de recursos. Podemos reducir el tiempo total de procesamiento, lo que resulta en un desarrollo e implementación de modelos más rápidos, al optimizar el uso de recursos para cada paso.
- La ejecución de scripts personalizados para el procesamiento de datos y el entrenamiento de modelos requiere la disponibilidad de los marcos y dependencias necesarios.
- Coordinar el entrenamiento de múltiples modelos puede resultar un desafío, especialmente cuando cada modelo posterior depende del resultado del anterior. El proceso de orquestar el flujo de trabajo entre estos modelos puede ser complejo y llevar mucho tiempo.
- Después de cada capa de entrenamiento, es necesario revisar un mapeo que refleje los temas producidos por el modelo y usarlo como entrada para la capa de modelo posterior.
Descripción general de la solución
En esta solución, el punto de entrada es amazon SageMaker Studio, que es un entorno de desarrollo integrado (IDE) basado en web proporcionado por AWS que permite a los científicos de datos y desarrolladores de ML construir, entrenar e implementar modelos de ML a escala de una manera colaborativa y eficiente. manera.
Los siguientes diagramas ilustran la arquitectura de alto nivel de la solución.
Como parte de la arquitectura, utilizamos los siguientes pasos de canalización de SageMaker:
- Procesamiento de SageMaker – Este paso le permite preprocesar y transformar datos antes del entrenamiento. Un beneficio de este paso es la capacidad de utilizar algoritmos integrados para transformaciones de datos comunes y escalado automático de recursos. También puede utilizar código personalizado para el preprocesamiento de datos complejos y le permite utilizar imágenes de contenedor personalizadas.
- Entrenamiento de SageMaker – Este paso le permite entrenar modelos de ML utilizando algoritmos integrados de SageMaker o código personalizado. Puede utilizar la capacitación distribuida para acelerar la capacitación del modelo.
- Devolución de llamada de SageMaker – Este paso le permite ejecutar código personalizado durante el flujo de trabajo de ML, como enviar notificaciones o activar pasos de procesamiento adicionales. Puede ejecutar procesos externos y reanudar el flujo de trabajo de la canalización al finalizar en este paso.
- Modelo SageMaker – Este paso le permite crear o registrar un modelo en amazon SageMaker.
Tutorial de implementación
Primero, configuramos la canalización de Sagemaker:
import boto3
import sagemaker
# create a Session with custom region (e.g. us-east-1), will be None if not specified
region = ""
# allocate default S3 bucket for SageMaker session, will be None if not specified
default_bucket = ""
boto_session = boto3.Session(region_name=region
sagemaker_client = boto_session.client("sagemaker")
Inicializar una sesión de SageMaker
sagemaker_session = sagemaker.session.Session(boto_session=boto_session, sagemaker_client=sagemaker_client, default_bucket= default_bucket,)
Establecer el rol de ejecución de Sagemaker para la sesión
role = sagemaker.session.get_execution_role(sagemaker_session)
Gestionar interacciones en contexto de canalización
pipeline_session = sagemaker.workflow.pipeline_context.PipelineSession(boto_session=boto_session, sagemaker_client=sagemaker_client, default_bucket=default_bucket,)
Definir la imagen base para que se ejecuten los scripts
account_id = role.split(":")(4)
# create a base image that take care of dependencies
ecr_repository_name = "".
tag = "latest"
container_image_uri = "artificial intelligence.dkr.ecr.crypto currency.amazonaws.com/tech education:education tech".format(account_id, region, ecr_repository_name, tag)
A continuación se ofrece una explicación detallada de los pasos del flujo de trabajo:
- Preprocesar los datos – Esto implica limpiar y preparar los datos para la ingeniería de funciones y dividir los datos en conjuntos de entrenamiento, prueba y validación.
import os
BASE_DIR = os.path.dirname(os.path.realpath(__file__))
from sagemaker.workflow.parameters import ParameterString
from sagemaker.workflow.steps import ProcessingStep
from sagemaker.processing import (
ProcessingInput,
ProcessingOutput,
ScriptProcessor,
)
processing_instance_type = ParameterString(
name="ProcessingInstanceType",
# choose an instance type suitable for the job
default_value="ml.m5.4xlarge"
)
script_processor = ScriptProcessor(
image_uri=container_image_uri,
command=("python"),
instance_type=processing_instance_type,
instance_count=1,
role=role,
)
# define the data preprocess job
step_preprocess = ProcessingStep(
name="DataPreprocessing",
processor=script_processor,
inputs=(
ProcessingInput(source=BASE_DIR, destination="/opt/ml/processing/input/code/")
),
outputs=(
ProcessingOutput(output_name="data_train", source="/opt/ml/processing/data_train"), # output data and dictionaries etc for later steps
)
code=os.path.join(BASE_DIR, "preprocess.py"),
)
- Capa de tren 1 modelo BERTopic – Se utiliza un paso de entrenamiento de SageMaker para entrenar la primera capa del modelo BERTopic utilizando una imagen de amazon Elastic Container Registry (amazon ECR) y un script de entrenamiento personalizado.
base_job_prefix="OppUseCase"
from sagemaker.workflow.steps import TrainingStep
from sagemaker.estimator import Estimator
from sagemaker.inputs import TrainingInput
training_instance_type = ParameterString(
name="TrainingInstanceType",
default_value="ml.m5.4xlarge"
)
# create an estimator for training job
estimator_first_layer = Estimator(
image_uri=container_image_uri,
instance_type=training_instance_type,
instance_count=1,
output_path= f"s3://{default_bucket}/{base_job_prefix}/train_first_layer", # S3 bucket where the training output be stored
role=role,
entry_point = "train_first_layer.py"
)
# create training job for the estimator based on inputs from data-preprocess step
step_train_first_layer = TrainingStep(
name="TrainFirstLayerModel",
estimator = estimator_first_layer,
inputs={
TrainingInput(
s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs( "data_train" ).S3Output.S3Uri,
),
},
)
from sagemaker.workflow.callback_step import CallbackStep, CallbackOutput, CallbackOutputTypeEnum
first_sqs_queue_to_use = ParameterString(
name="FirstSQSQueue",
default_value= , # add queue url
)
first_callback_output = CallbackOutput(output_name="s3_mapping_first_update", output_type=CallbackOutputTypeEnum.String)
step_first_mapping_update = CallbackStep(
name="FirstMappingUpdate",
sqs_queue_url= first_sqs_queue_to_use,
# Input arguments that will be provided in the SQS message
inputs={
"input_location": f"s3://{default_bucket}/{base_job_prefix}/mapping",
"output_location": f"s3://{default_bucket}/{base_job_prefix}/ mapping_first_update "
},
outputs=(
first_callback_output,
),
)
step_first_mapping_update.add_depends_on((step_train_first_layer)) # call back is run after the step_train_first_layer
- Tren capa 2 modelo BERTopic – Se utiliza otro SageMaker TrainingStep para entrenar la segunda capa del modelo BERTopic utilizando una imagen ECR y un script de entrenamiento personalizado.
estimator_second_layer = Estimator(
image_uri=container_image_uri,
instance_type=training_instance_type, # same type as of first train layer
instance_count=1,
output_path=f"s3://{bucket}/{base_job_prefix}/train_second_layer", # S3 bucket where the training output be stored
role=role,
entry_point = "train_second_layer.py"
)
# create training job for the estimator based on inputs from preprocessing, output of previous call back step and first train layer step
step_train_second_layer = TrainingStep(
name="TrainSecondLayerModel",
estimator = estimator_second_layer,
inputs={
TrainingInput(
s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs( "data_train").S3Output.S3Uri,
),
TrainingInput(
# Output of the previous call back step
s3_data= step_first_mapping_update.properties.Outputs("s3_mapping_first_update"),
),
TrainingInput(
s3_data=f"s3://{bucket}/{base_job_prefix}/train_first_layer"
),
}
)
- Utilice un paso de devolución de llamada – De manera similar al paso 3, esto implica enviar un mensaje a una cola SQS que activa una función Lambda. La función Lambda actualiza el archivo de mapeo en amazon S3 y envía un token de éxito a la canalización para reanudar su ejecución.
second_sqs_queue_to_use = ParameterString(
name="SecondSQSQueue",
default_value= , # add queue url
)
second_callback_output = CallbackOutput(output_name="s3_mapping_second_update", output_type=CallbackOutputTypeEnum.String)
step_second_mapping_update = CallbackStep(
name="SecondMappingUpdate",
sqs_queue_url= second_sqs_queue_to_use,
# Input arguments that will be provided in the SQS message
inputs={
"input_location": f"s3://{default_bucket}/{base_job_prefix}/mapping_first_update ",
"output_location": f"s3://{default_bucket}/{base_job_prefix}/mapping_second_update "
},
outputs=(
second_callback_output,
),
)
step_second_mapping_update.add_depends_on((step_train_second_layer)) # call back is run after the step_train_second_layer
- Tren capa 3 modelo BERTopic – Esto implica obtener el archivo de mapeo de amazon S3 y entrenar la tercera capa del modelo BERTopic utilizando una imagen ECR y un script de entrenamiento personalizado.
estimator_third_layer = Estimator(
image_uri=container_image_uri,
instance_type=training_instance_type, # same type as of prvious two train layers
instance_count=1,
output_path=f"s3://{default_bucket}/{base_job_prefix}/train_third_layer", # S3 bucket where the training output be stored
role=role,
entry_point = "train_third_layer.py"
)
# create training job for the estimator based on inputs from preprocess step, second callback step and outputs of previous two train layers
step_train_third_layer = TrainingStep(
name="TrainThirdLayerModel",
estimator = estimator_third_layer,
inputs={
TrainingInput(
s3_data=step_preprocess.properties.ProcessingOutputConfig.Outputs("data_train").S3Output.S3Uri,
),
TrainingInput(
# s3_data = Output of the previous call back step
s3_data= step_second_mapping_update.properties.Outputs(' s3_mapping_second_update’),
),
TrainingInput(
s3_data=f"s3://{default_bucket}/{base_job_prefix}/train_first_layer"
),
TrainingInput(
s3_data=f"s3://{default_bucket}/{base_job_prefix}/train_second_layer"
),
}
)
- Registra el modelo – Se utiliza un paso de modelo de SageMaker para registrar el modelo en el registro de modelos de SageMaker. Cuando el modelo está registrado, puede utilizarlo a través de un canal de inferencia de SageMaker.
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep
model = Model(
image_uri=container_image_uri,
model_data=step_train_third_layer.properties.ModelArtifacts.S3ModelArtifacts,
sagemaker_session=sagemaker_session,
role=role,
)
register_args = model.register(
content_types=("text/csv"),
response_types=("text/csv"),
inference_instances=("ml.c5.9xlarge", "ml.m5.xlarge"),
model_package_group_name=model_package_group_name,
approval_status=model_approval_status,
)
step_register = ModelStep(name="OppUseCaseRegisterModel", step_args=register_args)
Para entrenar eficazmente un modelo BERTopic y los métodos BIRCH y UMAP, necesita una imagen de entrenamiento personalizada que pueda proporcionar dependencias adicionales y el marco necesario para ejecutar el algoritmo. Para obtener una muestra funcional de una imagen de Docker personalizada, consulte Crear una imagen de contenedor de Docker personalizada para SageMaker.
Conclusión
En esta publicación, explicamos cómo puede utilizar la amplia gama de pasos que ofrece SageMaker Pipelines con imágenes personalizadas para entrenar un modelo de conjunto. Para obtener más información sobre cómo comenzar con Pipelines utilizando una plantilla de operaciones de aprendizaje automático (MLOps) existente, consulte Creación, automatización, administración y escalamiento de flujos de trabajo de aprendizaje automático mediante canalizaciones de amazon SageMaker.
Acerca de los autores
Bikramjeet Singh es un científico aplicado en el equipo de Sales Insights, Analytics and Data Science (SIADS) de AWS, responsable de crear la plataforma GenAI y las soluciones de infraestructura ai/ML para científicos de ML dentro de SIADS. Antes de trabajar como AS, Bikram trabajó como ingeniero de desarrollo de software en SIADS y Alexa ai.
Raúl Sharma es arquitecto senior de soluciones especializado en AWS y ayuda a los clientes de AWS a crear soluciones de aprendizaje automático e inteligencia artificial generativa. Antes de unirse a AWS, Rahul pasó varios años en las industrias de finanzas y seguros, ayudando a los clientes a crear plataformas de datos y análisis.
Sachin Mishra es un profesional experimentado con 16 años de experiencia en la industria en roles de liderazgo de software y consultoría tecnológica. Sachin lideró la función de ciencia e ingeniería de estrategia de ventas en AWS. En este puesto, fue responsable de ampliar el análisis cognitivo para la estrategia de ventas, aprovechando tecnologías avanzadas de IA/ML para generar conocimientos y optimizar los resultados comerciales.
Nada Abdalla es un científico investigador en AWS. Su trabajo y experiencia abarcan múltiples áreas científicas en estadística y aprendizaje automático, incluidos análisis de texto, sistemas de recomendación, modelado bayesiano y pronóstico. Anteriormente trabajó en el mundo académico y obtuvo su maestría y doctorado en Bioestadística de UCLA. A través de su trabajo en el mundo académico y la industria, publicó varios artículos en prestigiosas revistas de estadística y conferencias de aprendizaje automático aplicado. En su tiempo libre le gusta correr y pasar tiempo con su familia.