En Databricks, ayudo a grandes organizaciones minoristas a implementar y escalar canales de datos y aprendizaje automático. Aquí están los 8 más importantes. Chispa – chispear consejos/trucos que he aprendido en el campo.
A lo largo de esta publicación, asumimos un conocimiento general práctico de Spark y su estructura, pero esta publicación debe ser accesible para todos los niveles.
¡Vamos a sumergirnos!
Rápidamente, repasemos qué hace Spark…
Spark es un motor de procesamiento de big data. Toma python/java/scala/R/SQL y convierte ese código en un conjunto de transformaciones altamente optimizado.
En su nivel más bajo, Spark crea tareas, que son transformaciones paralelizables en particiones de datos. Luego, estas tareas se distribuyen desde un nodo controlador hasta los nodos trabajadores, que son responsables de aprovechar sus núcleos de CPU para completar las transformaciones. Al distribuir tareas a potencialmente muchos trabajadores, Spark nos permite escalar horizontalmente y, por lo tanto, admitir canales de datos complejos que serían imposibles en una sola máquina.
Ok, es de esperar que no todo eso sea información nueva. De cualquier manera, en las siguientes secciones iremos un poco más despacio. Estos consejos deberían ayudar tanto a los principiantes como a los intermedios en el uso de chispas.
Spark es complejo. Para ayudarle a usted y a otros potencialmente a comprender su estructura, aprovechemos una analogía impresionantemente buena tomada de teoría de colas: Spark es una tienda de comestibles.
Cuando se piensa en el componente informático distribuido de Spark, hay tres componentes principales….
- Particiones de datos: subconjuntos de filas de nuestros datos. En nuestra tienda de comestibles, son comestibles.
- Tareas de chispa: transformaciones de bajo nivel realizadas en una partición de datos. En nuestra tienda de comestibles, son clientes.
- Núcleos: la parte de su(s) procesador(es) que funcionan en paralelo. En nuestra tienda de comestibles, son cajeros.
¡Eso es todo!
Ahora, aprovechemos estos conceptos para hablar sobre algunos fundamentos de Spark.
Como se muestra en la figura 3, nuestros cajeros (núcleos) solo pueden procesar un cliente (tarea) a la vez. Además, algunos clientes tienen muchos comestibles (recuento de filas de partición), como lo muestra el primer cliente en el cajero 2. A partir de estas simples observaciones…
- Cuantos más cajeros (núcleos), más clientes (tareas) podrá procesar en paralelo. Esto es escala horizontal/vertical.
- Si no tiene suficientes clientes (tareas) para saturar sus cajeros (núcleos), pagará para que el cajero se quede ahí. Esto se relaciona con escalado automáticotamaño del clúster y tamaño de la partición.
- Si los clientes (tareas) tienen cantidades muy diferentes de alimentos (recuentos de filas de partición), verá una utilización desigual de sus cajeros. Esto es sesgo de datos.
- Cuanto mejores sean sus cajeros (núcleos), más rápido podrán procesar un solo cliente (tarea). Esto se relaciona con la actualización de su procesador.
- etc.
Dado que la analogía proviene de la teoría de colas, un campo directamente relacionado con la computación distribuida, ¡es bastante poderosa!
Utilice esta analogía para depurar, comunicar y desarrollar Spark.
El error más común entre los principiantes en Spark es malinterpretar la evaluación perezosa.
Evaluación perezosa significa que no se realizarán transformaciones de datos hasta que invoque una colección en la memoria. Ejemplos de métodos que invocan una colección incluyen, entre otros…
- .recolectar(): trae el DataFrame a la memoria como una lista de Python.
- .espectáculo(): imprimir el primero
n
filas de su DataFrame. - .contar(): obtiene el número de filas de su DataFrame.
- .primero(): obtiene la primera fila de su DataFrame.
El método de cobro incorrecto más común es aprovechar .count()
a lo largo de un programa. Cada vez que invocas una colección, todas las transformaciones ascendentes se volverán a calcular desde cero, por lo que si tienes 5 invocaciones de .count()
su programa se ejecutará asintóticamente 5 veces más.
¡Spark se evalúa perezosamente! Las canalizaciones deben tener un flujo único desde el(los) origen(es) hasta el(los) destino(s).
Un problema sorprendentemente común que surge cuando se trabaja con organizaciones grandes es que pierden de vista el panorama general y, por lo tanto, optimizan los procesos de manera ineficiente.
Así es como se deben optimizar las canalizaciones para la mayoría de los casos de uso…
- Preguntar si necesitamos hacer el proyecto. En pocas palabras, piense en lo que realmente obtiene al optimizar una canalización. Si espera mejorar el tiempo de ejecución en un 20% y ejecutar el proceso cuesta $100, ¿debería invertir su extremadamente costoso salario de ingeniero de datos para ahorrar $20 por ejecución? Tal vez. Tal vez no.
- Busque frutos maduros en el código. Después de aceptar realizar el proyecto, verifique si el código tiene fallas obvias. Algunos ejemplos son el mal uso de la evaluación diferida, las transformaciones innecesarias y el orden incorrecto de las transformaciones.
- Haga que el trabajo se ejecute según el SLA aprovechando la computación. Después de comprobar que el código es relativamente eficiente, simplemente aplique cálculo al problema para que pueda 1) cumplir con el SLA y 2) recopilar estadísticas de la interfaz de usuario de Spark.
- Detener. Si está saturando adecuadamente su computación y el costo no es atroz, realice algunas mejoras informáticas de último momento y luego deténgase. Tu tiempo es valioso. No lo desperdicie ahorrando dólares cuando podría estar generando miles de dólares en otro lugar.
- Bucear profundo. Finalmente, si realmente necesita profundizar porque el costo es inaceptable, arremangarse y optimizar los datos, el código y la computación.
La belleza de este marco es que 1 a 4 solo requieren un conocimiento superficial de Spark y son muy rápidos de ejecutar; A veces puedes recopilar información sobre los pasos 1 a 4 durante una llamada de 30 minutos. El marco también garantiza que pararemos tan pronto como estemos suficientemente bueno. Finalmente, si es necesario el paso 5, podemos delegarlo a aquellos del equipo que sean más fuertes en chispa.
Al encontrar todas las formas de evitar optimizar excesivamente una canalización, está ahorrando valiosas horas de desarrollador.
La pérdida de disco es la razón más común por la que los trabajos Spark se ejecutan con lentitud.
Es un concepto muy simple. Spark está diseñado para aprovechar el procesamiento en memoria. Si no tiene suficiente memoria, Spark intentará escribir los datos adicionales en el disco para evitar que el proceso falle. Esto se llama derrame de disco.
Escribir y leer desde el disco es lento, por lo que se debe evitar. Si desea aprender cómo identificar y mitigar derrames, siga este tutorial. Sin embargo, algunos métodos muy comunes y simples para mitigar el derrame son…
- Procese menos datos por tarea, lo que se puede lograr cambiando el recuento de particiones mediante spark.shuffle.particiones o distribución.
- Aumente la proporción de RAM a núcleo en su computadora.
Si desea que su trabajo se realice de manera óptima, evite derrames.
Ya sea que esté usando Scala, Java, Python, SQL o R, Spark siempre aprovechará las mismas transformaciones internas. Por lo tanto, utilice el lenguaje adecuado para su tarea.
¡SQL es el “lenguaje” menos detallado de todos los lenguajes Spark compatibles para muchas operaciones! Más tangiblemente:
- Si está agregando o modificando una columna, use seleccioneExpr o exprésespecialmente combinado con Python cuerdas f.
- Si necesita SQL complejo, cree vistas temporales y luego use chispa.sql().
Aquí hay dos ejemplos rápidos…
# Column rename and cast with SQL
df = df.selectExpr((f"{c}::int as {c}_abc" for c in df.columns))# Column rename and cast with native spark
for c in df.columns:
df = df.withColumn(f"{c}_abc", F.col(c).cast("int")).drop(c)
# Window functions with SQL
df.withColumn("running_total", expr(
"sum(value) over (order by id rows between unbounded preceding and current row)"
))# Window functions with native spark
windowSpec = Window.orderBy("id").rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_with_running_total_native = df.withColumn("running_total", F.sum("value").over(windowSpec))
Utilice SQL.
¿Necesita leer un montón de archivos de datos almacenados en un directorio complejo? Si es así, usa el poder extremadamente poderoso de Spark. leer opciones.
La primera vez que encontré este problema, reescribí os.walk trabajar con mi proveedor de nube donde se almacenaron los datos. Con mucho orgullo le mostré este método a mi socio de proyecto, quien simplemente dijo: “déjame compartir mi pantalla” y procedió a presentarme los filtros globales.
# Read all parquet files in the directory (and subdirectories)
df = spark.read.load(
"examples/src/main/resources/dir1",
format="parquet",
pathGlobFilter="*.parquet"
)
Cuando apliqué el filtro global que se muestra arriba en lugar de mi os.walk personalizado, la operación de ingesta fue 10 veces más rápida.
Spark tiene parámetros poderosos. Compruebe si existe la funcionalidad deseada antes de crear implementaciones personalizadas.
Los bucles casi siempre son perjudiciales para el rendimiento de la chispa. Este es el por qué…
Spark tiene dos fases principales: planificación y ejecución. En la fase de planificación, Spark crea un gráfico acíclico dirigido (DAG) que indica cómo se llevarán a cabo las transformaciones especificadas. La fase de planificación es relativamente costosa y a veces puede tardar varios segundos, por lo que conviene invocarla con la menor frecuencia posible.
Analicemos un caso de uso en el que debe iterar a través de muchos DataFrames, realizar transformaciones costosas y luego agregarlos a una tabla.
En primer lugar, existe soporte nativo para casi todos los casos de uso iterativos, específicamente UDF de pandas, funciones de ventana y uniones. Pero, si realmente necesita un bucle, así es como puede invocar una única fase de planificación y así obtener todas las transformaciones en un solo DAG.
import functools
from pyspark.sql import DataFramepaths = get_file_paths()
# BAD: For loop
for path in paths:
df = spark.read.load(path)
df = fancy_transformations(df)
df.write.mode("append").saveAsTable("xyz")
# GOOD: functools.reduce
lazily_evaluated_reads = (spark.read.load(path) for path in paths)
lazily_evaluted_transforms = (fancy_transformations(df) for df in lazily_evaluated_reads)
unioned_df = functools.reduce(DataFrame.union, lazily_evaluted_transforms)
unioned_df.write.mode("append").saveAsTable("xyz")
La primera solución utiliza un bucle for para iterar sobre rutas, realizar transformaciones sofisticadas y luego agregarlas a nuestra tabla delta de interés. En el segundo, almacenamos una lista de DataFrames evaluados de forma diferida, aplicamos transformaciones sobre ellos, luego los reducimos mediante una unión, realizamos un único plan de chispa y escribimos.
De hecho, podemos ver la diferencia en la arquitectura en el backend a través de la interfaz de usuario de Spark…
En la figura 5, el DAG de la izquierda correspondiente al bucle for tendrá 10 etapas. Sin embargo, el DAG de la derecha correspondiente a functools.reduce
tendrá una sola etapa y por lo tanto podrá procesarse más fácilmente en paralelo.
Para un caso de uso simple de leer 400 tablas delta únicas y luego agregarlas a una tabla delta, este método fue 6 veces más rápido que un bucle for.
Sea creativo para crear un DAG de chispa única.
No se trata de exageraciones.
Spark es un software bien establecido y, por lo tanto, bien documentado. Los LLM, específicamente GPT-4, son realmente buenos para sintetizar información compleja en explicaciones digeribles y concisas. Desde el lanzamiento de GPT-4, no he realizado un proyecto Spark complejo en el que no dependiera en gran medida de GPT-4.
Sin embargo, afirmando lo (con suerte) obvio, tenga cuidado con los LLM. Todo lo que envíe a un modelo de código cerrado puede convertirse en datos de capacitación para la organización matriz; asegúrese de no enviar nada confidencial. Además, valide que la salida de GPT sea legítima.
Cuando se usan correctamente, los LLM cambian las reglas del juego para impulsar el aprendizaje y el desarrollo. Vale $20/mes.