Implementación de ParDo y DoFn en Apache Beam en Detalles

Implementación de ParDo y DoFn en Apache Beam

Foto de ODISSEI en Unsplash

Explicación detallada del código para principiantes

Escribí un tutorial sobre algunas funciones de transformación comunes en Apache Beam en un tutorial anterior que cubría map, filter y combinePerKey(). Este tutorial será sobre la transformación ParDo, que no es más que otra forma de hacer Map. Pero la diferencia es que ParDo aplica la transformación en cada PCollection y devuelve cero o más elementos a la PCollection de salida. Por otro lado, la transformación Map genera exactamente un elemento para cada elemento de entrada. De esa manera, ParDo nos proporciona mucha flexibilidad para trabajar.

Otro aspecto importante de la transformación Pardo es que requiere el código del usuario en forma de DoFn. Veamos algunos ejemplos.

No dudes en descargar este conjunto de datos públicos y seguir:

Datos de muestra de ventas | Kaggle

Utilicé un cuaderno de Google Colab para trabajar con este código, por lo que es muy fácil de instalar. Aquí está el código para instalarlo:

!pip install --quiet apache_beam

Creé un directorio llamado ‘data’ para colocar el archivo CSV que usaremos y para colocar las salidas de nuestro ejercicio hoy.

mkdir -p data

Para comenzar, solo trabajaré en la cosa más simple del conjunto de datos. Leer el conjunto de datos y crear una lista con cada fila del conjunto de datos y enviarlas a un archivo de texto.

Leer un archivo de texto en un pipeline de beam es muy simple y directo. Tenemos un archivo CSV. Entonces, definiremos una clase CustomCoder() para esto, que codifica los objetos en una cadena de bytes primero, luego decodifica los bytes en sus objetos y, por último, especifica si el codificador está garantizado para codificar valores de manera determinista. Aquí está la documentación para el codificador.

from apache_beam.coders.coders import Coderclass CustomCoder(Coder):    """Un codificador personalizado utilizado para leer y escribir cadenas como UTF-8."""    def encode(self, value):        return value.encode("utf-8", "replace")    def decode(self, value):        return value.decode("utf-8", "ignore")    def is_deterministic(self):        return True

También hay una clase SplitRow() que simplemente utiliza la función .split() de Python.

class SplitRow(beam.DoFn):  def process(self, element)…

We will continue to update Zepes; if you have any questions or suggestions, please contact us!

Share:

Was this article helpful?

93 out of 132 found this helpful

Discover more

Inteligencia Artificial

Investigación de AI de SalesForce ha desarrollado ProGen Un gran avance en la ingeniería de proteínas mediante el uso de inteligencia artificial.

El desarrollo de proteínas funcionales ha sido durante mucho tiempo una búsqueda crítica en diversos campos científic...

Inteligencia Artificial

Destilando lo que sabemos

Los investigadores buscan reducir el tamaño de los modelos GPT grandes.

Noticias de Inteligencia Artificial

¡No más trampas! ¡Sapia.ai detecta respuestas generadas por inteligencia artificial en tiempo real!

En un emocionante avance, Sapia.ai ha presentado una nueva función que puede identificar y marcar respuestas creadas ...