Escalado dinámico de hilos

El escalado dinámico de hilos forma parte del conjunto de funciones de escalado vertical de Dataflow. Complementa la función de autoescalado horizontal de Dataflow ajustando el número de tareas paralelas, también conocidas como paquetes, que ejecuta cada trabajador de Dataflow. El objetivo es aumentar la eficiencia general de tu canalización de Dataflow.

Cuando Dataflow ejecuta un flujo de procesamiento, el procesamiento se distribuye entre varias máquinas virtuales (VMs) de Compute Engine, también conocidas como trabajadores. Un hilo es una tarea ejecutable que se ejecuta en un proceso más grande. Dataflow inicia varios hilos en cada trabajador.

Si el escalado dinámico de subprocesos está habilitado, el servicio Dataflow elige automáticamente el número de subprocesos adecuado para ejecutar en cada trabajador de Dataflow. Como cada hilo ejecuta una tarea, aumentar el número de hilos permite que se ejecuten más tareas en paralelo en un trabajador. Si usas esta función junto con la de escalado automático horizontal, el número total de subprocesos que utiliza la canalización sigue siendo el mismo, pero se usan menos trabajadores.

El escalado dinámico de subprocesos usa un algoritmo para determinar cuántos subprocesos necesita cada trabajador en función de las señales de uso de recursos generadas durante la ejecución de la canalización. Para obtener más información, consulta la sección Cómo funciona de esta página.

Ventajas

El escalado dinámico de hilos tiene las siguientes ventajas potenciales:

  • Permite que los trabajadores de Dataflow procesen los datos de forma más eficiente al mejorar el uso de la CPU y la memoria por trabajador.
  • Mejora el procesamiento en paralelo ajustando el número de hilos de trabajador disponibles para ejecutar tareas en paralelo durante la ejecución de la canalización.
  • Reduce el número de trabajadores necesarios para procesar grandes conjuntos de datos, lo que puede reducir los costes.

Compatibilidad y limitaciones

  • El escalado dinámico de hilos está disponible para las canalizaciones que usan los SDKs de Java, Python y Go.
  • La tarea de Dataflow debe usar Runner v2.
  • Solo se admiten las canalizaciones por lotes.
  • Las canalizaciones que consumen muchos recursos de CPU o memoria no se benefician del escalado dinámico de subprocesos.
  • El escalado dinámico de subprocesos no reduce el tiempo que tarda en completarse un trabajo de Dataflow.

Cómo funciona

El escalado dinámico de subprocesos usa principios de ajuste automático para escalar dinámicamente el número de subprocesos de cada trabajador del grupo de trabajadores de Dataflow. El número de subprocesos se escala de forma independiente en cada trabajador. Cada hilo ejecuta una tarea. Si aumentas el número de hilos, se podrán ejecutar más tareas en paralelo en un trabajador. A medida que se completan las tareas y ya no se necesitan los hilos, el recuento de hilos se reduce. Un algoritmo determina cuántos subprocesos necesita cada trabajador.

El número de hilos de un trabajador aumenta hasta un máximo de dos hilos por vCPU cuando se cumplen las dos condiciones siguientes:

  • El uso de memoria del trabajador es inferior al 50%.
  • El uso de CPU del trabajador es inferior al 65%.

El número de subprocesos de un trabajador se reduce a un mínimo de un subproceso por vCPU cuando se cumple la siguiente condición:

  • El uso de memoria en el trabajador es superior al 70%.

Para ver el uso de la memoria y la CPU de tu tarea, usa la pestaña Métricas de la tarea de la interfaz web de Dataflow.

Para asegurarse de que las recomendaciones sean válidas, Dataflow espera a que se estabilice la utilización de recursos antes de enviar recomendaciones a los trabajadores. Por ejemplo, el uso de la memoria y la CPU puede estar dentro del intervalo de escalado, pero, como el uso de los recursos sigue aumentando, Dataflow no envía ninguna recomendación. Cuando se estabiliza el uso de recursos, Dataflow envía una recomendación.

Si se produce un error de falta de memoria (OOM), el escalado de subprocesos se inhabilita automáticamente y la canalización se ejecuta con un subproceso por vCPU.

Habilitar el escalado dinámico de hilos

Para habilitar el escalado dinámico de hilos, usa la siguiente opción de servicio de Dataflow.

Java

--dataflowServiceOptions=enable_dynamic_thread_scaling

Python

--dataflow_service_options=enable_dynamic_thread_scaling

Go

--dataflow_service_options=enable_dynamic_thread_scaling

Si el escalado dinámico de hilos está habilitado, también puedes definir el número inicial y máximo de trabajadores disponibles para tu canalización durante la ejecución. Para obtener más información, consulta Opciones de canalización.

Verificar que el escalado dinámico de subprocesos esté habilitado

Cuando el escalado dinámico de hilos está habilitado, aparece el siguiente mensaje en los archivos de registro de los trabajadores:

Enabling thread vertical scaling feature in worker.

Para ver los archivos de registro de tu trabajador, en el Explorador de registros, usa el panel Consulta para filtrar los registros por Nombre de registro. Usa el siguiente nombre de registro en tu filtro:

projects/PROJECT_ID/logs/dataflow.googleapis.com%2Fharness

Puedes ver el número recomendado de subprocesos en los archivos de registro de los trabajadores. El siguiente mensaje incluye el número de hilos recomendado:

worker_thread_scaling_report_response { recommended_thread_count: NUMBER }

Si la utilización de recursos no se encuentra en el intervalo de escalado, el valor mostrado será igual al número de vCPUs del trabajador.

También puedes usar la Google Cloud consola para ver si el escalado dinámico de subprocesos está habilitado. Cuando está habilitada, en el panel Información de la tarea de Dataflow, en la fila dataflowServiceOptions de la sección Opciones de la canalización, se muestra enable_dynamic_thread_scaling.

Solución de problemas

En esta sección se proporcionan instrucciones para solucionar problemas habituales relacionados con el escalado dinámico de subprocesos.

El rendimiento se reduce cuando el escalado dinámico de hilos está habilitado

Si aumentas el número de subprocesos, pueden producirse problemas de rendimiento en los siguientes casos:

  • Cuando varios procesos intentan usar el mismo recurso, uno de ellos puede usarlo mientras que los demás deben esperar. Esta situación se conoce como contención de recursos. Cuando se produce una contención de recursos, el rendimiento de la canalización puede disminuir.
  • Cuando se producen errores de falta de memoria, se inhabilita el escalado dinámico de hilos. En algunos casos, los errores de falta de memoria pueden provocar que la canalización falle.

Verifica si el recuento de subprocesos ha aumentado. Para obtener información sobre cómo verificar el número de subprocesos recomendado, consulta la sección Verificar que el escalado de subprocesos esté habilitado de esta página.

Si el escalado de hilos está habilitado, para resolver este problema, cuando ejecutes tu pipeline, no incluyas la opción de servicio de escalado dinámico de hilos.

Trabajador unificado… habilitado e inhabilitado

Después de habilitar el escalado dinámico de hilos, es posible que tu trabajo falle y se muestre el siguiente error:

The workflow could not be created. Causes: (ID): Unified worker misconfigured by user and was both enabled and disabled.

Este error se produce cuando Runner v2 está inhabilitado explícitamente.

Para solucionar este problema, habilita Runner v2. Para obtener más información, consulta la sección Habilitar Dataflow Runner v2 de la página "Usar Dataflow Runner V2".

Actualizar el SDK

Después de habilitar el escalado dinámico de hilos, es posible que tu trabajo falle y se muestre el siguiente error:

Java

Dataflow Runner v2 requires the Apache Beam Java SDK version 2.29.0 or higher. Please upgrade your SDK and resubmit your job.

Python

Dataflow Runner v2 requires the Apache Beam SDK, version 2.21.0 or higher. Please upgrade your SDK and resubmit your job.

Este error se produce cuando no se puede habilitar Runner v2 porque la versión del SDK no es compatible.

Para resolver este problema, usa una versión del SDK que sea compatible con Runner v2.

No se puede habilitar la función de escalado vertical de la conversación

Después de habilitar el escalado dinámico de hilos, es posible que tu trabajo falle y se muestre el siguiente error:

The workflow could not be created. Causes: (ID): Thread vertical scaling feature can not be enabled while number_of_worker_harness_threads is specified.

Este error se produce cuando la canalización define explícitamente el número de subprocesos por trabajador mediante la opción numberOfWorkerHarnessThreads o number_of_worker_harness_threads pipeline.

Para solucionar este problema, quite la opción de canalización numberOfWorkerHarnessThreads o number_of_worker_harness_threads de su canalización.