Optimización del tamaño del archivo de salida en Apache Spark

Optimización tamaño archivo salida Apache Spark

Una Guía Completa sobre la Gestión de Particiones, Repartición y Operaciones de Coalesce

Foto de zhao chen en Unsplash

Imagínate al mando de una gran operación de procesamiento de datos en Spark. Una regla general mencionada a menudo en el discurso de optimización de Spark es que, para obtener el mejor rendimiento de E/S y una mayor paralelismo, cada archivo de datos debe tener un tamaño aproximado de 128Mb, que es el tamaño de partición predeterminado al leer un archivo [1].

Imagina tus archivos como embarcaciones navegando en el mar del procesamiento de datos. Si las embarcaciones son demasiado pequeñas, pierden mucho tiempo atracando y volviendo a zarpar, una metáfora para el motor de ejecución que gasta tiempo adicional en abrir archivos, listar directorios, obtener metadatos de objetos, configurar transferencia de datos y leer archivos. Por otro lado, si tus embarcaciones son demasiado grandes y no utilizas los muchos muelles del puerto, tienen que esperar a un único proceso prolongado de carga y descarga, una metáfora para el procesamiento de consultas que espera hasta que un único lector haya terminado de leer el archivo completo, lo que reduce el paralelismo [fig. 1].

Fig. 1 — Imagen del autor

Para ilustrar vívidamente la importancia de la optimización del tamaño de archivo, consulta la siguiente figura. En este ejemplo específico, cada tabla contiene 8 GB de datos.

Sin embargo, navegar por este delicado equilibrio no es tarea fácil, especialmente al tratar con grandes trabajos por lotes. Puedes sentir que has perdido el control sobre el número de archivos de salida. Esta guía te ayudará a recuperarlo.

La Clave para Entender: Particiones

El número de archivos de salida guardados en el disco es igual al número de particiones en los ejecutores de Spark cuando se realiza la operación de escritura. Sin embargo, determinar el número de particiones antes de realizar la operación de escritura puede ser complicado.

Cuando se lee una tabla, Spark por defecto lee bloques con un tamaño máximo de 128Mb (aunque esto se puede cambiar con sql.files.maxPartitionBytes). Por lo tanto, el número de particiones depende del tamaño de entrada. Sin embargo, en realidad, el número de particiones probablemente será igual al parámetro sql.shuffle.partitions. Este número tiene un valor predeterminado de 200, pero para cargas de trabajo más grandes, rara vez es suficiente. Mira este video para aprender cómo establecer el número ideal de particiones de shuffle.

El número de particiones en los ejecutores de Spark es igual a sql.shuffle.partitions si hay al menos una transformación amplia (wide transformation) en la ETL. Si solo se aplican transformaciones estrechas (narrow transformations), el número de particiones sería igual al número creado al leer el archivo.

Establecer el número de particiones de shuffle nos da un control de alto nivel sobre las particiones totales solo cuando se trata de tablas no particionadas. Una vez que entramos en el territorio de las tablas particionadas, cambiar el parámetro sql.shuffle.partitions no modificará fácilmente el tamaño de cada archivo de datos.

El Volante: Repartición y Coalesce

Existen dos formas principales de administrar el número de particiones en tiempo de ejecución: repartition() y coalesce(). Aquí hay una breve descripción:

  • Repartición: repartition(partitionCols, n_partitions) es una transformación perezosa (lazy transformation) con dos parámetros: el número de particiones y la(s) columna(s) de particionamiento. Cuando se realiza, Spark redistribuye las particiones en el clúster según la columna de particionamiento. Sin embargo, una vez que se guarda la tabla, se pierde la información sobre la repartición. Por lo tanto, esta útil información no se utilizará al leer el archivo.
df = df.repartition("nombre_columna", n_partitions)
  • Coalesce: coalesce(num_partitions) es también una transformación perezosa, pero solo toma un argumento: el número de particiones. Es importante destacar que la operación coalesce no redistribuye los datos en el clúster, por lo tanto es más rápido que repartition. Además, coalesce solo puede reducir el número de particiones, no funcionará si se intenta aumentar el número de particiones.
df = df.coalesce(num_partitions)

La idea principal aquí es que el uso del método coalesce generalmente es más beneficioso. Esto no quiere decir que la redistribución no sea útil; ciertamente lo es, especialmente cuando necesitamos ajustar el número de particiones en un dataframe durante la ejecución.

En mi experiencia con procesos ETL, donde trabajo con múltiples tablas de diferentes tamaños y realizo transformaciones y joins complejos, he encontrado que sql.shuffle.partitions no ofrece el control preciso que necesito. Por ejemplo, usar el mismo número de particiones de redistribución para unir dos tablas pequeñas y dos tablas grandes en el mismo ETL sería ineficiente, lo que resultaría en un exceso de particiones pequeñas para las tablas pequeñas o un número insuficiente de particiones para las tablas grandes. La redistribución también tiene el beneficio adicional de ayudarme a evitar problemas con joins desequilibrados y datos desequilibrados [2].

Dicho esto, la redistribución es menos adecuada antes de escribir la tabla en el disco y, en la mayoría de los casos, se puede reemplazar con coalesce. Coalesce tiene la ventaja sobre la redistribución antes de escribir en el disco por un par de razones:

  1. Evita una redistribución innecesaria de datos en el clúster.
  2. Permite el ordenamiento de datos según una heurística lógica. Cuando se utiliza el método de redistribución antes de escribir, los datos se redistribuyen en el clúster, lo que provoca una pérdida en su orden. En cambio, al usar coalesce, se mantiene el orden ya que los datos se agrupan en lugar de redistribuirse.

Veamos por qué el ordenamiento de los datos es crucial.

Orden en el horizonte: Importancia del ordenamiento de datos

Mencionamos anteriormente cómo cuando aplicamos el método repartition, Spark no guarda la información de particionamiento en los metadatos de la tabla. Sin embargo, al tratar con big data, esta es una pieza crucial de información por dos motivos:

  1. Permite escanear la tabla mucho más rápidamente en el momento de la consulta.
  2. Permite una mejor compresión, si se trata de un formato compresible (como parquet, CSV, Json, etc). Este es un gran artículo para entender por qué.

La idea principal es ordenar los datos antes de guardarlos. La información se retendrá en los metadatos y se utilizará en el momento de la consulta, lo que hace que la consulta sea mucho más rápida.

Ahora veamos las diferencias entre guardar en una tabla no particionada y una tabla particionada, y por qué guardar en una tabla particionada requiere algunos ajustes adicionales.

Gestión del tamaño de archivo en tablas particionadas

Cuando se trata de tablas no particionadas, gestionar el número de archivos durante la operación de guardar es un proceso directo. Utilizar el método coalesce antes de guardar logrará la tarea, independientemente de si los datos están ordenados o no.

# Ejemplo de uso del método coalesce antes de guardar una tabla no particionadadf.coalesce(10).write.format("parquet").save("/ruta/de/salida")

Sin embargo, este método no es efectivo al manejar tablas particionadas, a menos que los datos estén ordenados antes de hacer el coalesce. Para entender por qué sucede esto, debemos adentrarnos en las acciones que tienen lugar dentro de los ejecutores de Spark cuando los datos están ordenados versus cuando no lo están [fig.2].

Fig. 2 — Imagen del autor

Por lo tanto, el proceso estándar para guardar datos en una tabla particionada debería ser:

# Ejemplo de uso del método coalesce después de ordenar los datos en una tabla particionadadf.orderBy("nombreColumna").coalesce(10).write.format("parquet").save("/ruta/de/salida_particionada")

Otros Ayudas Navegacionales

Más allá de repartition y coalesce, es posible que encuentres útil maxnumberofrecords. Es un método práctico para evitar que los archivos se vuelvan demasiado grandes y se puede utilizar junto con los métodos anteriores.

df.write.option("maxRecordsPerFile", 50000).save("ruta_del_archivo")

Conclusiones Finales

El dominio del tamaño de los archivos en un trabajo de Spark a menudo implica prueba y error. Es fácil pasar por alto la optimización en una era en la que el espacio de almacenamiento es barato y la potencia de procesamiento está a solo un clic de distancia. Pero a medida que el procesamiento de terabytes y petabytes de datos se convierte en la norma, olvidar estas simples técnicas de optimización puede tener costos significativos en términos monetarios, de tiempo y ambientales.

Espero que este artículo te capacite para realizar ajustes eficientes en tus procesos de ETL. Como un experimentado capitán de mar, que navegues por las aguas de Spark con confianza y claridad.

We will continue to update Zepes; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

Inteligencia Artificial

Estos ingeniosos drones pueden unirse en el aire para formar un robot más grande y fuerte

Investigadores de la Universidad de Tokio en Japón han desarrollado drones que pueden ensamblar y desmontar en pleno ...

Inteligencia Artificial

Utilice un modelo de base de IA generativa para la síntesis y respuesta a preguntas utilizando sus propios datos

Los modelos de lenguaje grandes (LLMs) se pueden utilizar para analizar documentos complejos y proporcionar resúmenes...

Inteligencia Artificial

Varias filtraciones de datos en 23andMe

Datos genéticos robados llevan a una demanda colectiva contra la empresa de pruebas.

Inteligencia Artificial

Tesla retira 2 millones de autos con controles de seguridad de 'autoguiado' insuficientes

Tesla está retirando más de 2 millones de vehículos para solucionar los sistemas de Autopilot que los reguladores gub...

Inteligencia Artificial

OpenAI presenta 6 emocionantes características de ChatGPT para revolucionar la experiencia del usuario

OpenAI, la empresa pionera detrás de ChatGPT, continúa innovando y mejorando la experiencia del usuario con seis emoc...

Inteligencia Artificial

Más allá de los límites humanos El surgimiento de la SuperInteligencia

De ANI a AGI y más allá Descifrando el camino evolutivo de la IA.