Desplegar flujos de procesamiento de Dataflow

En este documento se ofrece una descripción general de la implementación de flujos de trabajo y se destacan algunas de las operaciones que puedes realizar en un flujo de trabajo implementado.

Ejecutar un flujo de procesamiento

Después de crear y probar tu flujo de procesamiento de Apache Beam, ejecútalo. Puedes ejecutar tu flujo de procesamiento de forma local, lo que te permite probar y depurar tu flujo de procesamiento de Apache Beam, o en Dataflow, un sistema de procesamiento de datos disponible para ejecutar flujos de procesamiento de Apache Beam.

Ejecutar localmente

Ejecuta tu flujo de procesamiento de forma local.

Java

El siguiente código de ejemplo, extraído de la guía de inicio rápido, muestra cómo ejecutar la canalización WordCount de forma local. Para obtener más información, consulta cómo ejecutar tu canalización de Java de forma local.

En el terminal, ejecuta el siguiente comando:

  mvn compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--output=counts"
  

Python

El siguiente código de ejemplo, extraído de la guía de inicio rápido, muestra cómo ejecutar la canalización WordCount de forma local. Para obtener más información, consulta cómo ejecutar tu canalización de Python de forma local.

En el terminal, ejecuta el siguiente comando:

python -m apache_beam.examples.wordcount \ --output outputs

Go

El siguiente código de ejemplo, extraído de la guía de inicio rápido, muestra cómo ejecutar la canalización WordCount de forma local. Para obtener más información, consulta cómo ejecutar tu canalización de Go de forma local.

En el terminal, ejecuta el siguiente comando:

    go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
  

Aprende a ejecutar tu flujo de procesamiento de forma local en tu máquina con el ejecutor directo.

Ejecutar en Dataflow

Ejecuta tu flujo de procesamiento en Dataflow.

.

Java

El siguiente código de ejemplo, extraído de la guía de inicio rápido, muestra cómo ejecutar la canalización WordCount en Dataflow. Para obtener más información, consulta cómo ejecutar tu canalización de Java en Dataflow.

En el terminal, ejecuta el siguiente comando (desde el directorio word-count-beam):

  mvn -Pdataflow-runner compile exec:java \
    -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--project=PROJECT_ID \
    --gcpTempLocation=gs://BUCKET_NAME/temp/ \
    --output=gs://BUCKET_NAME/output \
    --runner=DataflowRunner \
    --region=REGION"
    

Haz los cambios siguientes:

  • PROJECT_ID: tu ID de proyecto Google Cloud
  • BUCKET_NAME: el nombre de tu segmento de Cloud Storage
  • REGION: una región de Dataflow, como us-central1

Python

El siguiente código de ejemplo, extraído de la guía de inicio rápido, muestra cómo ejecutar la canalización WordCount en Dataflow. Para obtener más información, consulta cómo ejecutar tu flujo de procesamiento de Python en Dataflow.

En el terminal, ejecuta el siguiente comando:

python -m apache_beam.examples.wordcount \
    --region DATAFLOW_REGION \
    --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --temp_location gs://STORAGE_BUCKET/tmp/

Haz los cambios siguientes:

  • DATAFLOW_REGION: la región en la que quieres implementar el trabajo de Dataflow. Por ejemplo, europe-west1

    La marca --region anula la región predeterminada que se ha definido en el servidor de metadatos, en tu cliente local o en las variables de entorno.

  • STORAGE_BUCKET: el nombre de Cloud Storage que copiaste antes
  • PROJECT_ID: el Google Cloud ID de proyecto que has copiado anteriormente

Go

El siguiente código de ejemplo, extraído de la guía de inicio rápido, muestra cómo ejecutar la canalización WordCount en Dataflow. Para obtener más información, consulta cómo ejecutar tu canalización de Go en Dataflow.

En el terminal, ejecuta el siguiente comando:

  posix-terminal go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://STORAGE_BUCKET/binaries/
  

Haz los cambios siguientes:

  • STORAGE_BUCKET: el nombre del segmento de Cloud Storage.
  • PROJECT_ID: el ID del proyecto. Google Cloud
  • DATAFLOW_REGION: la región en la que quieres desplegar la tarea de Dataflow. Por ejemplo, europe-west1. Para ver una lista de las ubicaciones disponibles, consulta Ubicaciones de Dataflow. Ten en cuenta que la marca --region anula la región predeterminada que se ha definido en el servidor de metadatos, en tu cliente local o en las variables de entorno.

Consulta cómo ejecutar tu flujo de procesamiento en el servicio Dataflow con el ejecutor de Dataflow.

Cuando ejecutas tu flujo de procesamiento en Dataflow, este convierte el código de tu flujo de procesamiento de Apache Beam en una tarea de Dataflow. Dataflow gestiona por completo los servicios de Google Cloud Platform, como Compute Engine y Cloud Storage, para ejecutar tu trabajo de Dataflow, y activa y desactiva automáticamente los recursos necesarios. Puedes consultar más información sobre cómo convierte Dataflow tu código de Apache Beam en una tarea de Dataflow en el artículo Ciclo de vida de un flujo de procesamiento.

Validación de flujos de procesamiento

Cuando ejecutas tu flujo de procesamiento en Dataflow, antes de que se inicie el trabajo, Dataflow realiza pruebas de validación en el flujo. Cuando una prueba de validación detecta problemas en la canalización, Dataflow rechaza el envío de la tarea de forma anticipada. En los registros de trabajos, Dataflow incluye mensajes con el siguiente texto. Cada mensaje también incluye detalles sobre los resultados de la validación e instrucciones para resolver el problema.

The preflight pipeline validation failed for job JOB_ID.

Las pruebas de validación que se ejecutan dependen de los recursos y servicios que utilice tu trabajo de Dataflow.

  • Si la API de Uso de Servicio está habilitada en tu proyecto, las pruebas de validación de la canalización comprueban si los servicios necesarios para ejecutar tu trabajo de Dataflow están habilitados.
  • Si la API Cloud Resource Manager está habilitada en tu proyecto, las pruebas de validación de la canalización comprueban si tienes las configuraciones a nivel de proyecto necesarias para ejecutar tu trabajo de Dataflow.

Para obtener más información sobre cómo habilitar servicios, consulta el artículo Habilitar e inhabilitar servicios.

Para obtener información sobre cómo resolver problemas de permisos detectados durante la validación de la canalización, consulta La validación de la canalización ha fallado.

Si quieres anular la validación de la canalización e iniciar el trabajo con errores de validación, usa la siguiente opción de servicio de la canalización:

Java

--dataflowServiceOptions=enable_preflight_validation=false

Python

--dataflow_service_options=enable_preflight_validation=false

Go

--dataflow_service_options=enable_preflight_validation=false

Definir opciones de flujo de procesamiento

Puedes controlar algunos aspectos de cómo ejecuta Dataflow tu trabajo configurando opciones de flujo de procesamiento en el código del flujo de procesamiento de Apache Beam. Por ejemplo, puedes usar opciones de la canalización para definir si esta se ejecuta en máquinas virtuales de trabajador, en el backend del servicio Dataflow o de forma local.

Gestionar dependencias de la canalización

Muchos flujos de procesamiento de Apache Beam se pueden ejecutar con los entornos de ejecución de Dataflow predeterminados. Sin embargo, en algunos casos prácticos de procesamiento de datos, es útil usar bibliotecas o clases adicionales. En estos casos, puede que tengas que gestionar las dependencias de tu flujo de trabajo. Para obtener más información sobre cómo gestionar las dependencias, consulta el artículo Gestionar las dependencias de las canalizaciones en Dataflow.

Supervisar tu tarea

Dataflow ofrece visibilidad sobre tus tareas a través de herramientas como la interfaz de monitorización de Dataflow y la interfaz de línea de comandos de Dataflow.

Acceder a las VMs de trabajador

Para ver las instancias de máquina virtual de una canalización determinada, puedes usar laGoogle Cloud consola. Desde ahí, puedes usar SSH para acceder a cada instancia. Sin embargo, una vez que la tarea se completa o falla, el servicio Dataflow cierra y limpia automáticamente las instancias de VM.

Optimizaciones de tareas

Además de gestionar los Google Cloud recursos, Dataflow realiza y optimiza automáticamente muchos aspectos del procesamiento paralelo distribuido.

Paralelización y distribución

Dataflow particiona automáticamente tus datos y distribuye el código de los trabajadores en instancias de Compute Engine para que se procesen en paralelo. Para obtener más información, consulta Paralelización y distribución.

Optimizaciones de fusión y combinación

Dataflow usa el código de tu canalización para crear un gráfico de ejecución que representa los PCollections y las transformaciones de tu canalización, y optimiza el gráfico para conseguir el rendimiento y el uso de recursos más eficientes. Dataflow también optimiza automáticamente las operaciones que pueden ser costosas, como las agregaciones de datos. Para obtener más información, consulta Optimización de fusión y Optimización de combinación.

Funciones de sintonización automática

El servicio Dataflow incluye varias funciones que permiten ajustar sobre la marcha la asignación de recursos y la partición de datos. Estas funciones ayudan a Dataflow a ejecutar tu trabajo de la forma más rápida y eficiente posible. Entre estas funciones, se incluyen las siguientes:

Streaming Engine

De forma predeterminada, el ejecutor de canalizaciones de Dataflow ejecuta los pasos de tu canalización de streaming por completo en máquinas virtuales de trabajador, lo que consume CPU, memoria y almacenamiento de disco persistente de los trabajadores. Streaming Engine de Dataflow traslada la ejecución de flujos de procesamiento fuera de las máquinas virtuales de trabajadores y la ubica en el backend del servicio Dataflow. Para obtener más información, consulta Streaming Engine.

Programación flexible de recursos de Dataflow

Dataflow FlexRS reduce los costes del procesamiento por lotes mediante técnicas de programación avanzadas, el servicio Dataflow Shuffle y una combinación de instancias de máquina virtual (VM) interrumpibles y VMs estándar. Al ejecutar VMs interrumpibles y VMs normales en paralelo, Dataflow mejora la experiencia de usuario si Compute Engine detiene instancias de VMs interrumpibles durante un evento del sistema. FlexRS ayuda a asegurar que la canalización siga avanzando y que no pierdas el trabajo anterior cuando Compute Engine interrumpa temporalmente tus VMs no garantizadas. Para obtener más información sobre FlexRS, consulta el artículo sobre cómo usar la programación flexible de recursos en Dataflow.

VM blindada de Dataflow

A partir del 1 de junio del 2022, el servicio Dataflow usará máquinas virtuales protegidas para todos los trabajadores. Para obtener más información sobre las funciones de las VMs blindadas, consulta VM blindada.