Los portátiles no son suficientes para el aprendizaje automático a escala
Todas las imágenes, a menos que se indique lo contrario, son del autor.
Existe un malentendido (por no decir una fantasía) que sigue reapareciendo en las empresas cuando se trata de IA y aprendizaje automático. Las personas a menudo juzgan mal la complejidad y las habilidades necesarias para llevar a producción proyectos de aprendizaje automático, ya sea porque no entienden el trabajo o (peor aún) porque creen que lo entienden, cuando no es así.
Su primera reacción al descubrir la IA podría ser algo como “La IA es en realidad bastante simple, solo necesito un Jupyter Notebook, copiar y pegar código de aquí y allá, o preguntarle a Copilot, y boom. Después de todo, no es necesario contratar científicos de datos…” Y la historia siempre termina mal, con amargura, decepción y la sensación de que la IA es una estafa: dificultad para pasar a producción, deriva de datos, errores, comportamiento no deseado.
Así que escribámoslo de una vez por todas: IA/aprendizaje automático/cualquier trabajo relacionado con datos es un trabajo real, no un pasatiempo. Requiere habilidades, artesanía y herramientas. Si cree que puede realizar ML en producción con portátiles, está equivocado.
Este artículo tiene como objetivo mostrar, con un ejemplo sencillo, todo el esfuerzo, habilidades y herramientas que se necesitan para pasar de un cuaderno a un pipeline real en producción. Porque el ML en producción se trata, principalmente, de poder automatizar la ejecución de su código de forma regular, con automatización y monitoreo.
Y para aquellos que buscan un tutorial completo sobre “canalizaciones de cuaderno a vértice”, esto puede resultarles útil.
Imaginemos que es un científico de datos que trabaja en una empresa de comercio electrónico. Su empresa vende ropa en línea y el equipo de marketing le pide ayuda: están preparando una oferta especial para productos específicos y les gustaría dirigirse a los clientes de manera eficiente adaptando el contenido del correo electrónico que se les enviará para maximizar la conversión. Por lo tanto, su trabajo es simple: a cada cliente se le debe asignar una puntuación que represente la probabilidad de que compre un producto de la oferta especial.
La oferta especial se dirigirá específicamente a esas marcas, lo que significa que el equipo de marketing quiere saber qué clientes comprarán su próximo producto de las siguientes marcas:
Allegra K, Calvin Klein, Carhartt, Hanes, Volcom, Nautica, Quiksilver, Diesel, Dockers, Hurley
Para este artículo utilizaremos un conjunto de datos disponible públicamente de Google, el `ellook_ecommerce` conjunto de datos. Contiene datos falsos con transacciones, datos de clientes, datos de productos, todo lo que tendríamos a nuestra disposición cuando trabajamos en un minorista de moda online.
Para seguir este cuaderno, necesitará acceso a Google Cloud Platform, pero la lógica se puede replicar a otros proveedores de nube o terceros como Neptune, MLFlow, etc.
Como científico de datos respetable, comienza creando un cuaderno que nos ayudará a explorar los datos.
Primero importamos bibliotecas que usaremos durante este artículo:
import catboost as cb
import pandas as pd
import sklearn as sk
import numpy as np
import datetime as dtfrom dataclasses import dataclass
from sklearn.model_selection import train_test_split
from google.cloud import bigquery
%load_ext watermark
%watermark --packages catboost,pandas,sklearn,numpy,google.cloud.bigquery
catboost : 1.0.4
pandas : 1.4.2
numpy : 1.22.4
google.cloud.bigquery: 3.2.0
Obtener y preparar los datos.
Luego cargaremos los datos de BigQuery usando el cliente Python. Asegúrese de utilizar su propia identificación de proyecto:
query = """
SELECT
transactions.user_id,
products.brand,
products.category,
products.department,
products.retail_price,
users.gender,
users.age,
users.created_at,
users.country,
users.city,
transactions.created_at
FROM `bigquery-public-data.thelook_ecommerce.order_items` as transactions
LEFT JOIN `bigquery-public-data.thelook_ecommerce.users` as users
ON transactions.user_id = users.id
LEFT JOIN `bigquery-public-data.thelook_ecommerce.products` as products
ON transactions.product_id = products.id
WHERE status <> 'Cancelled'
"""client = bigquery.Client()
df = client.query(query).to_dataframe()
Deberías ver algo como esto al mirar el marco de datos:
Estos representan las transacciones/compras realizadas por los clientes, enriquecidas con información del cliente y del producto.
Dado que nuestro objetivo es predecir qué marca comprarán los clientes en su próxima compra, procederemos de la siguiente manera:
- Compras grupales cronológicamente para cada cliente
- Si un cliente tiene N compras, consideramos la enésima compra como objetivo y la N-1 como nuestras características.
- Por lo tanto excluimos a los clientes con solo 1 compra.
Pongamos eso en código:
# Compute recurrent customers
recurrent_customers = df.groupby('user_id')('created_at').count().to_frame("n_purchases")# Merge with dataset and filter those with more than 1 purchase
df = df.merge(recurrent_customers, left_on='user_id', right_index=True, how='inner')
df = df.query('n_purchases > 1')
# Fill missing values
df.fillna('NA', inplace=True)
target_brands = (
'Allegra K',
'Calvin Klein',
'Carhartt',
'Hanes',
'Volcom',
'Nautica',
'Quiksilver',
'Diesel',
'Dockers',
'Hurley'
)
aggregation_columns = ('brand', 'department', 'category')
# Group purchases by user chronologically
df_agg = (df.sort_values('created_at')
.groupby(('user_id', 'gender', 'country', 'city', 'age'), as_index=False)(('brand', 'department', 'category'))
.agg({k: ";".join for k in ('brand', 'department', 'category')})
)
# Create the target
df_agg('last_purchase_brand') = df_agg('brand').apply(lambda x: x.split(";")(-1))
df_agg('target') = df_agg('last_purchase_brand').isin(target_brands)*1
df_agg('age') = df_agg('age').astype(float)
# Remove last item of sequence features to avoid target leakage :
for col in aggregation_columns:
df_agg(col) = df_agg(col).apply(lambda x: ";".join(x.split(";")(:-1)))
Observe cómo eliminamos el último elemento de la secuencia de características: esto es muy importante ya que de lo contrario obtendremos lo que llamamos una “fuga de datos”: el objetivo es parte de las características, el modelo recibe la respuesta cuando aprende.
Ahora tenemos esta nueva df_agg
marco de datos:
Comparando con el marco de datos original, vemos que user_id 2 efectivamente compró IZOD, Parke & Ronen y finalmente Orvis, que no está en las marcas objetivo.
Dividir en tren, validación y prueba.
Como científico de datos experimentado, ahora dividirá sus datos en diferentes conjuntos, ya que obviamente sabe que los tres son necesarios para realizar un aprendizaje automático riguroso. (La validación cruzada está fuera del alcance de hoy, amigos, hagámoslo simple).
Una cosa clave al dividir los datos es utilizar el método no tan conocido stratify
parámetro de scikit-learn train_test_split()
método. La razón de esto es el desequilibrio de clases: si la distribución objetivo (% de 0 y 1 en nuestro caso) difiere entre el entrenamiento y las pruebas, podríamos sentirnos frustrados por los malos resultados al implementar el modelo. ML 101 kids: mantenga las distribuciones de datos lo más similares posible entre los datos de entrenamiento y los datos de prueba.
# Remove unecessary featuresdf_agg.drop('last_purchase_category', axis=1, inplace=True)
df_agg.drop('last_purchase_brand', axis=1, inplace=True)
df_agg.drop('user_id', axis=1, inplace=True)
# Split the data into train and eval
df_train, df_val = train_test_split(df_agg, stratify=df_agg('target'), test_size=0.2)
print(f"{len(df_train)} samples in train")
df_train, df_val = train_test_split(df_agg, stratify=df_agg('target'), test_size=0.2)
print(f"{len(df_train)} samples in train")
# 30950 samples in train
df_val, df_test = train_test_split(df_val, stratify=df_val('target'), test_size=0.5)
print(f"{len(df_val)} samples in val")
print(f"{len(df_test)} samples in test")
# 3869 samples in train
# 3869 samples in test
Una vez hecho esto, dividiremos elegantemente nuestro conjunto de datos entre características y objetivos:
X_train, y_train = df_train.iloc(:, :-1), df_train('target')
X_val, y_val = df_val.iloc(:, :-1), df_val('target')
X_test, y_test = df_test.iloc(:, :-1), df_test('target')
Entre las características se encuentran diferentes tipos. Normalmente los separamos entre:
- Características numéricas: son continuas y reflejan una cantidad mensurable u ordenada.
- características categóricas: generalmente son discretas y a menudo se representan como cadenas (por ejemplo, un país, un color, etc.)
- caracteristicas del texto: suelen ser secuencias de palabras.
Por supuesto que puede haber más como imagen, vídeo, audio, etc.
El modelo: presentando CatBoost
Para nuestro problema de clasificación (ya sabías que estábamos en un marco de clasificación, ¿no?), usaremos una biblioteca simple pero muy poderosa: CatBoost. Está construido y mantenido por Yandex y proporciona una API de alto nivel para jugar fácilmente con árboles potenciados. Está cerca de XGBoost, aunque no funciona exactamente igual bajo el capó.
CatBoost ofrece un buen contenedor para manejar funciones de diferentes tipos. En nuestro caso, algunas características pueden considerarse “texto” ya que son la concatenación de palabras, como “Calvin Klein;BCBGeneration;Hanes”. Tratar con este tipo de funciones a veces puede ser doloroso, ya que es necesario manejarlas con divisores de texto, tokenizadores, lematizadores, etc. ¡Con suerte, CatBoost puede administrar todo por nosotros!
# Define features
features = {
'numerical': ('retail_price', 'age'),
'static': ('gender', 'country', 'city'),
'dynamic': ('brand', 'department', 'category')
}# Build CatBoost "pools", which are datasets
train_pool = cb.Pool(
X_train,
y_train,
cat_features=features.get("static"),
text_features=features.get("dynamic"),
)
validation_pool = cb.Pool(
X_val,
y_val,
cat_features=features.get("static"),
text_features=features.get("dynamic"),
)
# Specify text processing options to handle our text features
text_processing_options = {
"tokenizers": (
{"tokenizer_id": "SemiColon", "delimiter": ";", "lowercasing": "false"}
),
"dictionaries": ({"dictionary_id": "Word", "gram_order": "1"}),
"feature_processing": {
"default": (
{
"dictionaries_names": ("Word"),
"feature_calcers": ("BoW"),
"tokenizers_names": ("SemiColon"),
}
),
},
}
Ahora estamos listos para definir y entrenar nuestro modelo. Revisar todos y cada uno de los parámetros está fuera del alcance actual ya que la cantidad de parámetros es bastante impresionante, pero no dude en verificar la API usted mismo.
Y para abreviar, hoy no realizaremos ajustes de hiperparámetros, ¡pero esto es obviamente una gran parte del trabajo del científico de datos!
# Train the model
model = cb.CatBoostClassifier(
iterations=200,
loss_function="Logloss",
random_state=42,
verbose=1,
auto_class_weights="SqrtBalanced",
use_best_model=True,
text_processing=text_processing_options,
eval_metric='AUC'
)model.fit(
train_pool,
eval_set=validation_pool,
verbose=10
)
Y listo, nuestro modelo está entrenado. ¿Terminamos?
No. Necesitamos verificar que el rendimiento de nuestro modelo entre el entrenamiento y las pruebas sea consistente. Una brecha enorme entre el entrenamiento y las pruebas significa que nuestro modelo está sobreajustado (es decir, “aprende los datos de entrenamiento de memoria y no es bueno para predecir datos invisibles”).
Para la evaluación de nuestro modelo, utilizaremos la puntuación ROC-AUC. Tampoco profundizaré en este tema, pero según mi propia experiencia, esta es una métrica generalmente bastante sólida y mucho mejor que la precisión.
Una breve nota al margen sobre la precisión: normalmente no recomiendo usar esto como métrica de evaluación. Piense en un conjunto de datos desequilibrado en el que tiene un 1% de positivos y un 99% de negativos. ¿Cuál sería la precisión de un modelo muy tonto que predice 0 todo el tiempo? 99%. Entonces la precisión no es útil aquí.
from sklearn.metrics import roc_auc_scoreprint(f"ROC-AUC for train set : {roc_auc_score(y_true=y_train, y_score=model.predict(X_train)):.2f}")
print(f"ROC-AUC for validation set : {roc_auc_score(y_true=y_val, y_score=model.predict(X_val)):.2f}")
print(f"ROC-AUC for test set : {roc_auc_score(y_true=y_test, y_score=model.predict(X_test)):.2f}")
ROC-AUC for train set : 0.612
ROC-AUC for validation set : 0.586
ROC-AUC for test set : 0.622
Para ser honesto, 0,62 AUC no es nada bueno y es un poco decepcionante para el experto científico de datos que es. Nuestro modelo definitivamente necesita un poco de ajuste de parámetros aquí, y tal vez también deberíamos realizar la ingeniería de características más en serio.
Pero ya es mejor que las predicciones aleatorias (uf):
# random predictionsprint(f"ROC-AUC for train set : {roc_auc_score(y_true=y_train, y_score=np.random.rand(len(y_train))):.3f}")
print(f"ROC-AUC for validation set : {roc_auc_score(y_true=y_val, y_score=np.random.rand(len(y_val))):.3f}")
print(f"ROC-AUC for test set : {roc_auc_score(y_true=y_test, y_score=np.random.rand(len(y_test))):.3f}")
ROC-AUC for train set : 0.501
ROC-AUC for validation set : 0.499
ROC-AUC for test set : 0.501
Supongamos que por ahora estamos satisfechos con nuestro modelo y nuestro portátil. Aquí es donde se detendrían los científicos de datos aficionados. Entonces, ¿cómo damos el siguiente paso y estamos listos para la producción?
Conozca a Docker
Docker es un conjunto de productos de plataforma como servicio que utilizan virtualización a nivel de sistema operativo para entregar software en paquetes llamados contenedores. Dicho esto, piense en Docker como un código que puede ejecutarse en todas partes y que le permite evitar la situación de “funciona en su máquina pero no en la mía”.
¿Por qué utilizar Docker? Porque entre cosas interesantes como poder compartir su código, conservar versiones del mismo y garantizar su fácil implementación en todas partes, también se puede utilizar para crear canalizaciones. Ten paciencia conmigo y lo entenderás a medida que avanzamos.
El primer paso para crear una aplicación en contenedores es refactorizar y limpiar nuestro desordenado cuaderno. Vamos a definir 2 archivos, preprocess.py
y train.py
para nuestro ejemplo muy simple, y colóquelos en un src
directorio. También incluiremos nuestra requirements.txt
archivo con todo lo que contiene.
# src/preprocess.pyfrom sklearn.model_selection import train_test_split
from google.cloud import bigquery
def create_dataset_from_bq():
query = """
SELECT
transactions.user_id,
products.brand,
products.category,
products.department,
products.retail_price,
users.gender,
users.age,
users.created_at,
users.country,
users.city,
transactions.created_at
FROM `bigquery-public-data.thelook_ecommerce.order_items` as transactions
LEFT JOIN `bigquery-public-data.thelook_ecommerce.users` as users
ON transactions.user_id = users.id
LEFT JOIN `bigquery-public-data.thelook_ecommerce.products` as products
ON transactions.product_id = products.id
WHERE status <> 'Cancelled'
"""
client = bigquery.Client(project='<replace_with_your_project_id>')
df = client.query(query).to_dataframe()
print(f"{len(df)} rows loaded.")
# Compute recurrent customers
recurrent_customers = df.groupby('user_id')('created_at').count().to_frame("n_purchases")
# Merge with dataset and filter those with more than 1 purchase
df = df.merge(recurrent_customers, left_on='user_id', right_index=True, how='inner')
df = df.query('n_purchases > 1')
# Fill missing value
df.fillna('NA', inplace=True)
target_brands = (
'Allegra K',
'Calvin Klein',
'Carhartt',
'Hanes',
'Volcom',
'Nautica',
'Quiksilver',
'Diesel',
'Dockers',
'Hurley'
)
aggregation_columns = ('brand', 'department', 'category')
# Group purchases by user chronologically
df_agg = (df.sort_values('created_at')
.groupby(('user_id', 'gender', 'country', 'city', 'age'), as_index=False)(('brand', 'department', 'category'))
.agg({k: ";".join for k in ('brand', 'department', 'category')})
)
# Create the target
df_agg('last_purchase_brand') = df_agg('brand').apply(lambda x: x.split(";")(-1))
df_agg('target') = df_agg('last_purchase_brand').isin(target_brands)*1
df_agg('age') = df_agg('age').astype(float)
# Remove last item of sequence features to avoid target leakage :
for col in aggregation_columns:
df_agg(col) = df_agg(col).apply(lambda x: ";".join(x.split(";")(:-1)))
df_agg.drop('last_purchase_category', axis=1, inplace=True)
df_agg.drop('last_purchase_brand', axis=1, inplace=True)
df_agg.drop('user_id', axis=1, inplace=True)
return df_agg
def make_data_splits(df_agg):
df_train, df_val = train_test_split(df_agg, stratify=df_agg('target'), test_size=0.2)
print(f"{len(df_train)} samples in train")
df_val, df_test = train_test_split(df_val, stratify=df_val('target'), test_size=0.5)
print(f"{len(df_val)} samples in val")
print(f"{len(df_test)} samples in test")
return df_train, df_val, df_test
# src/train.pyimport catboost as cb
import pandas as pd
import sklearn as sk
import numpy as np
import argparse
from sklearn.metrics import roc_auc_score
def train_and_evaluate(
train_path: str,
validation_path: str,
test_path: str
):
df_train = pd.read_csv(train_path)
df_val = pd.read_csv(validation_path)
df_test = pd.read_csv(test_path)
df_train.fillna('NA', inplace=True)
df_val.fillna('NA', inplace=True)
df_test.fillna('NA', inplace=True)
X_train, y_train = df_train.iloc(:, :-1), df_train('target')
X_val, y_val = df_val.iloc(:, :-1), df_val('target')
X_test, y_test = df_test.iloc(:, :-1), df_test('target')
features = {
'numerical': ('retail_price', 'age'),
'static': ('gender', 'country', 'city'),
'dynamic': ('brand', 'department', 'category')
}
train_pool = cb.Pool(
X_train,
y_train,
cat_features=features.get("static"),
text_features=features.get("dynamic"),
)
validation_pool = cb.Pool(
X_val,
y_val,
cat_features=features.get("static"),
text_features=features.get("dynamic"),
)
test_pool = cb.Pool(
X_test,
y_test,
cat_features=features.get("static"),
text_features=features.get("dynamic"),
)
params = CatBoostParams()
text_processing_options = {
"tokenizers": (
{"tokenizer_id": "SemiColon", "delimiter": ";", "lowercasing": "false"}
),
"dictionaries": ({"dictionary_id": "Word", "gram_order": "1"}),
"feature_processing": {
"default": (
{
"dictionaries_names": ("Word"),
"feature_calcers": ("BoW"),
"tokenizers_names": ("SemiColon"),
}
),
},
}
# Train the model
model = cb.CatBoostClassifier(
iterations=200,
loss_function="Logloss",
random_state=42,
verbose=1,
auto_class_weights="SqrtBalanced",
use_best_model=True,
text_processing=text_processing_options,
eval_metric='AUC'
)
model.fit(
train_pool,
eval_set=validation_pool,
verbose=10
)
roc_train = roc_auc_score(y_true=y_train, y_score=model.predict(X_train))
roc_eval = roc_auc_score(y_true=y_val, y_score=model.predict(X_val))
roc_test = roc_auc_score(y_true=y_test, y_score=model.predict(X_test))
print(f"ROC-AUC for train set : {roc_train:.2f}")
print(f"ROC-AUC for validation set : {roc_eval:.2f}")
print(f"ROC-AUC for test. set : {roc_test:.2f}")
return {"model": model, "scores": {"train": roc_train, "eval": roc_eval, "test": roc_test}}
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--train-path", type=str)
parser.add_argument("--validation-path", type=str)
parser.add_argument("--test-path", type=str)
parser.add_argument("--output-dir", type=str)
args, _ = parser.parse_known_args()
_ = train_and_evaluate(
args.train_path,
args.validation_path,
args.test_path)
Mucho más limpio ahora. ¡Puedes ejecutar tu script desde la línea de comando ahora!
$ python train.py --train-path xxx --validation-path yyy etc.
Ahora estamos listos para crear nuestra imagen de Docker. Para eso necesitamos escribir un Dockerfile en la raíz del proyecto:
# DockerfileFROM python:3.8-slim
WORKDIR /
COPY requirements.txt /requirements.txt
COPY src /src
RUN pip install --upgrade pip && pip install -r requirements.txt
ENTRYPOINT ( "bash" )
Esto tomará nuestros requisitos, copie el src
carpeta y su contenido, e instale los requisitos con pip cuando se construya la imagen.
Para crear e implementar esta imagen en un registro de contenedor, podemos usar el SDK de Google Cloud y el gcloud
comandos:
PROJECT_ID = ...
IMAGE_NAME=f'thelook_training_demo'
IMAGE_TAG='latest'
IMAGE_URI='eu.gcr.io/{}/{}:{}'.format(PROJECT_ID, IMAGE_NAME, IMAGE_TAG)!gcloud builds submit --tag $IMAGE_URI .
Si todo va bien deberías ver algo como esto:
Vertex Pipelines, el paso a la producción
Las imágenes de Docker son el primer paso para realizar un aprendizaje automático serio en producción. El siguiente paso es construir lo que llamamos “oleoductos”. Los pipelines son una serie de operaciones orquestadas por un marco llamado Kubeflow. Kubeflow puede ejecutarse en Vertex ai en Google Cloud.
Las razones para preferir pipelines a notebooks en producción pueden ser discutibles, pero les daré tres según mi experiencia:
- Monitoreo y reproducibilidad: cada canalización se almacena con sus artefactos (conjuntos de datos, modelos, métricas), lo que significa que puede comparar ejecuciones, volver a ejecutarlas y auditarlas. Cada vez que vuelves a ejecutar un cuaderno, pierdes el historial (o tienes que administrar los artefactos tú mismo, así como los registros. Buena suerte).
- Costos: Ejecutar una computadora portátil implica tener una máquina en la que se ejecuta. — Esta máquina tiene un costo y, para modelos grandes o conjuntos de datos enormes, necesitará máquinas virtuales con especificaciones estrictas.
— Tienes que acordarte de apagarlo cuando no lo uses.
— O simplemente puede bloquear su máquina local si decide no utilizar una máquina virtual y tiene otras aplicaciones ejecutándose.
– Las canalizaciones de Vertex ai son una sin servidor servicio, lo que significa que no tiene que administrar la infraestructura subyacente y solo paga por lo que usa, es decir, el tiempo de ejecución. - Escalabilidad: Buena suerte al ejecutar docenas de experimentos en su computadora portátil local simultáneamente. Volverá a utilizar una máquina virtual, la escalará y volverá a leer el punto anterior.
La última razón para preferir los pipelines a los portátiles es subjetiva y también muy discutible, pero en mi opinión los portátiles simplemente no están diseñados para ejecutar cargas de trabajo según un cronograma. Sin embargo, son geniales para la exploración.
Utilice un trabajo cron con al menos una imagen de Docker, o canalizaciones si desea hacer las cosas de la manera correcta, pero nunca, nunca, ejecute una computadora portátil en producción.
Sin más preámbulos, escribamos los componentes de nuestro pipeline:
# IMPORT REQUIRED LIBRARIES
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
Dataset,
Input,
Model,
Output,
Metrics,
Markdown,
HTML,
component,
OutputPath,
InputPath)
from kfp.v2 import compiler
from google.cloud.aiplatform import pipeline_jobs%watermark --packages kfp,google.cloud.aiplatform
kfp : 2.7.0
google.cloud.aiplatform: 1.50.0
El primer componente descargará los datos de Bigquery y los almacenará como un archivo CSV.
¡La BASE_IMAGE que usamos es la imagen que construimos previamente! Podemos usarlo para importar módulos y funciones que definimos en nuestra imagen de Docker. src
carpeta:
@component(
base_image=BASE_IMAGE,
output_component_file="get_data.yaml"
)
def create_dataset_from_bq(
output_dir: Output(Dataset),
):from src.preprocess import create_dataset_from_bq
df = create_dataset_from_bq()
df.to_csv(output_dir.path, index=False)
Siguiente paso: dividir datos
@component(
base_image=BASE_IMAGE,
output_component_file="train_test_split.yaml",
)
def make_data_splits(
dataset_full: Input(Dataset),
dataset_train: Output(Dataset),
dataset_val: Output(Dataset),
dataset_test: Output(Dataset)):import pandas as pd
from src.preprocess import make_data_splits
df_agg = pd.read_csv(dataset_full.path)
df_agg.fillna('NA', inplace=True)
df_train, df_val, df_test = make_data_splits(df_agg)
print(f"{len(df_train)} samples in train")
print(f"{len(df_val)} samples in train")
print(f"{len(df_test)} samples in test")
df_train.to_csv(dataset_train.path, index=False)
df_val.to_csv(dataset_val.path, index=False)
df_test.to_csv(dataset_test.path, index=False)
Siguiente paso: entrenamiento modelo. Guardaremos las puntuaciones del modelo para mostrarlas en el siguiente paso:
@component(
base_image=BASE_IMAGE,
output_component_file="train_model.yaml",
)
def train_model(
dataset_train: Input(Dataset),
dataset_val: Input(Dataset),
dataset_test: Input(Dataset),
model: Output(Model)
):import json
from src.train import train_and_evaluate
outputs = train_and_evaluate(
dataset_train.path,
dataset_val.path,
dataset_test.path
)
cb_model = outputs('model')
scores = outputs('scores')
model.metadata("framework") = "catboost"
# Save the model as an artifact
with open(model.path, 'w') as f:
json.dump(scores, f)
El último paso es calcular las métricas (que en realidad se calculan en el entrenamiento del modelo). Es simplemente necesario, pero es bueno mostrarle lo fácil que es construir componentes livianos. Observe cómo en este caso no construimos el componente a partir de BASE_IMAGE (que a veces puede ser bastante grande), sino que solo construimos una imagen liviana con los componentes necesarios:
@component(
base_image="python:3.9",
output_component_file="compute_metrics.yaml",
)
def compute_metrics(
model: Input(Model),
train_metric: Output(Metrics),
val_metric: Output(Metrics),
test_metric: Output(Metrics)
):import json
file_name = model.path
with open(file_name, 'r') as file:
model_metrics = json.load(file)
train_metric.log_metric('train_auc', model_metrics('train'))
val_metric.log_metric('val_auc', model_metrics('eval'))
test_metric.log_metric('test_auc', model_metrics('test'))
Por lo general, hay otros pasos que podemos incluir, como si queremos implementar nuestro modelo como un punto final de API, pero esto es de nivel más avanzado y requiere crear otra imagen de Docker para servir el modelo. Estará cubierto la próxima vez.
Ahora peguemos los componentes:
# USE TIMESTAMP TO DEFINE UNIQUE PIPELINE NAMES
TIMESTAMP = dt.datetime.now().strftime("%Y%m%d%H%M%S")
DISPLAY_NAME = 'pipeline-thelook-demo-{}'.format(TIMESTAMP)
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"# Define the pipeline. Notice how steps reuse outputs from previous steps
@dsl.pipeline(
pipeline_root=PIPELINE_ROOT,
# A name for the pipeline. Use to determine the pipeline Context.
name="pipeline-demo"
)
def pipeline(
project: str = PROJECT_ID,
region: str = REGION,
display_name: str = DISPLAY_NAME
):
load_data_op = create_dataset_from_bq()
train_test_split_op = make_data_splits(
dataset_full=load_data_op.outputs("output_dir")
)
train_model_op = train_model(
dataset_train=train_test_split_op.outputs("dataset_train"),
dataset_val=train_test_split_op.outputs("dataset_val"),
dataset_test=train_test_split_op.outputs("dataset_test"),
)
model_evaluation_op = compute_metrics(
model=train_model_op.outputs("model")
)
# Compile the pipeline as JSON
compiler.Compiler().compile(
pipeline_func=pipeline,
package_path='thelook_pipeline.json'
)
# Start the pipeline
start_pipeline = pipeline_jobs.PipelineJob(
display_name="thelook-demo-pipeline",
template_path="thelook_pipeline.json",
enable_caching=False,
location=REGION,
project=PROJECT_ID
)
# Run the pipeline
start_pipeline.run(service_account=<your_service_account_here>)
Si todo funciona bien, ahora verá su canalización en la interfaz de usuario de Vertex:
Puedes hacer clic en él y ver los diferentes pasos:
La ciencia de datos, a pesar de que todos los entusiastas del código bajo o sin código le dicen que no es necesario ser desarrollador para realizar aprendizaje automático, es un trabajo real. Como todo trabajo, requiere habilidades, conceptos y herramientas que van más allá de los cuadernos.
Y para aquellos que aspiran a convertirse en científicos de datos, esta es la realidad del trabajo.
Feliz codificación.