En esta página, se proporciona información sobre el uso de memoria en las canalizaciones de Dataflow y los pasos para investigar y resolver problemas con errores de Dataflow fuera de memoria (OOM).
Acerca del uso de memoria de Dataflow
Para solucionar problemas de memoria insuficiente, es útil comprender cómo las canalizaciones de Dataflow usan la memoria.
Cuando Dataflow ejecuta una canalización, el procesamiento se distribuye entre varias máquinas virtuales (VMs) de Compute Engine, que a menudo se denominan trabajadores.
Los trabajadores procesan los elementos de trabajo del servicio de Dataflow y delega los elementos de trabajo a los procesos del SDK de Apache Beam. Un proceso del SDK de Apache Beam crea instancias de DoFn
. DoFn
es una clase del SDK de Apache Beam que define una función de procesamiento distribuido.
Dataflow inicia varios subprocesos en cada trabajador, y la memoria de cada trabajador se comparte en todos los subprocesos. Un subproceso es una única tarea ejecutable que se ejecuta dentro de un proceso más grande. La cantidad predeterminada de subprocesos depende de varios factores y varía entre los trabajos por lotes y de transmisión.
Si tu canalización necesita más memoria que la cantidad predeterminada de memoria disponible en los trabajadores, es posible que encuentres errores de memoria.
Las canalizaciones de Dataflow usan la memoria de un trabajador de tres maneras:
Memoria operativa del trabajador
Los trabajadores de Dataflow necesitan memoria para sus sistemas operativos y procesos del sistema. El uso de memoria del trabajador no suele superar 1 GB. El uso suele ser inferior a 1 GB.
- Varios procesos en el trabajador usan la memoria para garantizar que tu canalización esté en orden. Cada uno de estos procesos puede reservar una pequeña cantidad de memoria para su operación.
- Cuando tu canalización no usa Streaming Engine, los procesos de trabajadores adicionales usan la memoria.
Memoria del proceso del SDK
Los procesos del SDK de Apache Beam pueden crear objetos y datos que se comparten entre subprocesos dentro del proceso, denominados en esta página como objetos y datos compartidos del SDK. El uso de memoria de estos objetos y datos compartidos del SDK se conoce como memoria de proceso del SDK. En la siguiente lista, se incluyen ejemplos de objetos y datos compartidos del SDK:
- Datos de entrada adicionales
- Modelos de aprendizaje automático
- Objetos singleton en la memoria
- Objetos de Python creados con el
módulo
apache_beam.utils.shared
- Datos cargados desde fuentes externas, como Cloud Storage o BigQuery
Los trabajos de transmisión que no usan Streaming Engine almacenan entradas complementarias en la memoria. Para las canalizaciones de Java y Go, cada trabajador tiene una copia de la entrada complementaria. Para las canalizaciones de Python, cada proceso del SDK de Apache Beam tiene una copia de la entrada complementaria.
Los trabajos de transmisión que usan Streaming Engine tienen un límite de tamaño de entrada lateral de 80 MB. Las entradas complementarias se almacenan fuera de la memoria del trabajador.
El uso de memoria de los objetos y datos compartidos del SDK crece de forma lineal con la cantidad de procesos del SDK de Apache Beam. En las canalizaciones de Java y Go, se inicia un proceso del SDK de Apache Beam por trabajador. En las canalizaciones de Python, se inicia un proceso del SDK de Apache Beam por CPU virtual. Los objetos y datos compartidos del SDK se vuelven a usar en subprocesos dentro del mismo proceso del SDK de Apache Beam.
DoFn
uso de memoria
DoFn
es una clase del SDK de Apache Beam que define una función de procesamiento distribuido.
Cada trabajador puede ejecutar instancias de DoFn
en simultáneo. Cada subproceso ejecuta una instancia DoFn
. Cuando evalúas el uso total de la memoria, calcular el tamaño del conjunto de trabajo o la cantidad de memoria necesaria para que una aplicación continúe funcionando, puede ser útil. Por ejemplo, si una DoFn
individual usa un máximo de 5 MB de memoria y un trabajador tiene 300 subprocesos, el uso de memoria DoFn
puede alcanzar un máximo de 1.5 GB o la cantidad de bytes de memoria multiplicada por la cantidad de subprocesos. Según cómo los trabajadores usen la memoria, un aumento repentino en el uso podría provocar que los trabajadores se queden sin memoria.
Es difícil estimar cuántas instancias de un DoFn que Dataflow crea. La cantidad depende de varios factores, como el SDK, el tipo de máquina, etcétera. Además, varios subprocesos seguidos pueden usar DoFn.
El servicio de Dataflow no garantiza cuántas veces se invoca un DoFn
ni garantiza la cantidad exacta de instancias de DoFn
creadas en el transcurso de una canalización.
Sin embargo, la siguiente tabla proporciona estadísticas sobre el nivel de paralelismo que puedes esperar y estima un límite superior en la cantidad de instancias DoFn
.
SDK de Beam para Python
Lote | Transmisión sin Streaming Engine | Streaming Engine | |
---|---|---|---|
Paralelismo |
1 proceso por CPU virtual 1 subproceso por proceso 1 subproceso por CPU virtual
|
1 proceso por CPU virtual 12 subprocesos por proceso 12 subprocesos por CPU virtual |
1 proceso por CPU virtual 12 subprocesos por proceso 12 subprocesos por CPU virtual
|
Cantidad máxima de instancias DoFn simultáneas (todas estas cantidades están sujetas a cambios en cualquier momento). |
1 DoFn por subproceso
1
|
1 DoFn por subproceso
12
|
1 DoFn por subproceso
12
|
SDK de Beam para Java/Go
Lote | Transmisión sin Streaming Engine | Streaming Engine | |
---|---|---|---|
Paralelismo |
1 proceso por VM de trabajador 1 subproceso por CPU virtual
|
1 proceso por VM de trabajador 300 subprocesos por proceso 300 subprocesos por VM de trabajador
|
1 proceso por VM de trabajador 500 subprocesos por proceso 500 subprocesos por VM de trabajador
|
Cantidad máxima de instancias DoFn simultáneas (todas estas cantidades están sujetas a cambios en cualquier momento). |
1 DoFn por subproceso
1
|
1 DoFn por subproceso
300
|
1 DoFn por subproceso
500
|
Por ejemplo, cuando usas el SDK de Python con un trabajador de Dataflow n1-standard-2
, se aplica lo siguiente:
- Trabajos por lotes: Dataflow inicia un proceso por CPU virtual (dos en este caso). Cada proceso usa un subproceso, y cada subproceso crea una instancia
DoFn
. - Trabajos de transmisión con Streaming Engine: Dataflow inicia un proceso por vCPU (dos en total). Sin embargo, cada proceso puede generar hasta 12 subprocesos, cada uno con su propia instancia de DoFn.
Cuando diseñas canalizaciones complejas, es importante comprender el ciclo de vida de DoFn
.
Asegúrate de que tus funciones DoFn
sean serializables y evita modificar el argumento del elemento directamente dentro de ellas.
Cuando tienes una canalización de varios lenguajes y más de un SDK de Apache Beam se ejecuta en el trabajador, este usa el menor grado de paralelismo por proceso de cada posible.
Diferencias entre Java, Go y Python
Java, Go y Python administran los procesos y la memoria de manera diferente. Como resultado, el enfoque que debes adoptar cuando solucionas los problemas de errores de memoria varía según si tu canalización usa Java, Go o Python.
Canalizaciones de Java y Go
En canalizaciones de Java y Go:
- Cada trabajador inicia un proceso del SDK de Apache Beam.
- Los objetos y datos compartidos del SDK, como entradas complementarias y cachés, se comparten entre todos los subprocesos del trabajador.
- Por lo general, la memoria que usan los datos y los objetos compartidos del SDK no se escala en función de la cantidad de CPU virtuales en el trabajador.
Canalizaciones de Python
En las canalizaciones de Python:
- Cada trabajador inicia un proceso del SDK de Apache Beam por CPU virtual.
- Los objetos y datos compartidos del SDK, como las entradas y las memorias caché, se comparten entre todos los subprocesos de cada proceso del SDK de Apache Beam.
- La cantidad total de subprocesos en el trabajador escala de manera lineal en función de la cantidad de CPU virtuales. Como resultado, la memoria que usan los objetos compartidos y los datos del SDK crece de manera lineal con la cantidad de CPU virtuales.
- Los subprocesos que realizan el trabajo se distribuyen entre los procesos. Las unidades de trabajo nuevas se asignan a un proceso sin elementos de trabajo o al proceso con la menor cantidad de elementos de trabajo asignados actualmente.
Encontrar errores de memoria insuficiente
Para determinar si tu canalización se está quedando sin memoria, usa uno de los siguientes métodos.
- En la página Detalles del trabajo, en el panel Registros, consulta la pestaña Diagnóstico. Esta pestaña muestra errores relacionados con los problemas de memoria y la frecuencia con la que se producen.
- En la interfaz de supervisión de Dataflow, usa el gráfico de uso de memoria para supervisar la capacidad y el uso de memoria del trabajador.
En la página Detalles del trabajo, en el panel Registros, selecciona Registros de trabajador. Busca los errores de memoria.
Java
El supervisor de Java Memory, configurado por la
interfaz MemoryMonitorOptions
,
informa de forma periódica las métricas de recolección de elementos no utilizados. Si la fracción de tiempo de CPU usada para
la recopilación de elementos no utilizados excede un umbral del 50% durante
un período prolongado, el agente de SDK actual falla.
Deberías ver un error similar al siguiente ejemplo:
Shutting down JVM after 8 consecutive periods of measured GC thrashing. Memory is used/total/max = ...
Este error de memoria puede ocurrir cuando la memoria física aún está disponible. Por lo general, el error indica que el uso de memoria de la canalización es ineficiente. Para resolver este problema, optimiza tu canalización.
Si tu trabajo tiene errores de memoria elevados o no hay memoria suficiente, sigue las recomendaciones de esta página para optimizar el uso de memoria o aumentar la cantidad de memoria disponible.
Resuelve errores de memoria insuficiente
Los cambios en tu canalización de Dataflow pueden resolver los errores de memoria o reducir el uso de memoria. Entre los cambios posibles, se incluyen las siguientes acciones:
En el diagrama siguiente, se muestra el flujo de trabajo de solución de problemas de Dataflow que se describe en esta página.
Optimiza tu canalización
Varias operaciones de canalización pueden provocar errores de falta de memoria. En esta sección, se proporcionan opciones para reducir el uso de memoria de tu canalización. Para identificar las etapas de la canalización que consumen más memoria, usa Cloud Profiler para supervisar el rendimiento de las canalizaciones.
Puedes usar las siguientes prácticas recomendadas para optimizar tu canalización:
- Usa conectores de E/S integrados de Apache Beam para leer archivos
- Rediseñar las operaciones cuando se usan PTransforms
GroupByKey
- Reduce los datos de entrada de fuentes externas
- Comparte objetos en subprocesos
- Usa representaciones de elementos con eficiencia de memoria
- Reduce el tamaño de las entradas complementarias
Usa los conectores de E/S integrados de Apache Beam para leer archivos
No abras archivos grandes dentro de un DoFn
. Para leer archivos, usa los conectores de E/S integrados de Apache Beam.
Los archivos que se abren en una DoFn
deben caber en la memoria. Debido a que varias instancias DoFn
se ejecutan en simultáneo, los archivos grandes abiertos en DoFn
pueden causar errores de memoria insuficiente.
Rediseñar operaciones cuando se usan PTransforms GroupByKey
Cuando usas una PTransform GroupByKey
en Dataflow, los valores resultantes por clave y por ventana se procesan en un solo subproceso. Debido a que estos datos se pasan como una transmisión desde el servicio de backend de Dataflow a los trabajadores, no es necesario que se ajuste a la memoria del trabajador. Sin embargo, si los valores se recopilan en la memoria, la lógica de procesamiento puede causar errores de falta de memoria.
Por ejemplo, si tienes una clave que contiene datos para una ventana y agregas los valores clave a un objeto en la memoria, como una lista, puede haber errores de memoria insuficiente. En esta situación, es posible que el trabajador no tenga suficiente capacidad de memoria para contener todos los objetos.
Para obtener más información sobre las GroupByKey
transformaciones, consulta la documentación de
Python GroupByKey
y Java GroupByKey
de Apache Beam.
La siguiente lista contiene sugerencias para diseñar tu canalización a fin de minimizar el consumo de memoria cuando se usan PTransforms GroupByKey
.
- Para reducir la cantidad de datos por clave y por ventana, evita las claves con muchos valores, también conocidos como claves de acceso rápido.
- Para reducir la cantidad de datos recopilados por ventana, usa un tamaño de ventana más pequeño.
- Si usas valores de clave en una ventana para calcular un número, usa una transformación
Combine
. No realices el cálculo en una sola instanciaDoFn
después de recopilar los valores. - Filtra los valores o duplicados antes de procesarlos. Para obtener más información, consulta la documentación de transformación de Python
Filter
y JavaFilter
.
Reduce los datos de entrada de fuentes externas
Si realizas llamadas a una API externa o a una base de datos para enriquecimiento de datos, los datos que se muestran deben ajustarse a la memoria del trabajador.
Si agrupas en lotes las llamadas, se recomienda usar una transformación GroupIntoBatches
.
Si te quedas con errores de memoria insuficiente, reduce el tamaño del lote. Para obtener más información sobre la agrupación en lotes, consulta la documentación de transformación de Python GroupIntoBatches
y Java GroupIntoBatches
.
Comparte objetos en subprocesos
Compartir un objeto de datos en la memoria a través de instancias de DoFn
puede mejorar la eficiencia del espacio y el acceso. Los objetos de datos creados en cualquier método de DoFn
, incluidos Setup
, StartBundle
, Process
, FinishBundle
y Teardown
, se invocan para cada uno DoFn
. En Dataflow, cada trabajador puede tener varias instancias de DoFn
. Para obtener un uso de memoria más eficiente, pasa un objeto de datos como un singleton a fin de compartirlo entre varios DoFn
. Para obtener más información, consulta la entrada de blog Reutilización de caché en DoFn
.
Usa representaciones de elementos con eficiencia de memoria
Evalúa si puedes usar representaciones para elementos PCollection
que usan menos memoria. Cuando uses codificadores en tu canalización, considera no solo codificar representaciones, sino también decodificar los elementos PCollection
. Las matrices dispersas a menudo pueden beneficiarse de este tipo de optimización.
Reduce el tamaño de las entradas complementarias
Si tus DoFn
usan entradas complementarias, reduce el tamaño de la entrada complementaria. Para las entradas complementarias que son colecciones de elementos, considera usar vistas iterables, como AsIterable
o AsMultimap
, en lugar de vistas que materializan toda la entrada complementaria al mismo tiempo, como AsList
.
Haz que haya más memoria disponible
Para aumentar la memoria disponible, puedes aumentar la cantidad total de memoria disponible en los trabajadores sin cambiar la cantidad de memoria disponible por subproceso. De manera alternativa, puedes aumentar la cantidad de memoria disponible por subproceso. Cuando aumentas la memoria por subproceso, también aumentas la memoria total en el trabajador.
Puedes aumentar la cantidad de memoria disponible por subproceso de cuatro maneras:
- Usa un tipo de máquina con más memoria por CPU virtual.
- Usa un tipo de máquina con más CPU virtuales (canalizaciones de transmisión de Java y Go).
- Reduce la cantidad de subprocesos.
- Usa solo un proceso del SDK de Apache Beam (transmisión de Python y canalizaciones de Python Runner v2).
Usa un tipo de máquina con más memoria por CPU virtual
Para seleccionar un trabajador con más memoria por CPU virtual, usa uno de los siguientes métodos.
- Usa un tipo de máquina con alta capacidad de memoria en la familia de máquinas de uso general. Los tipos de máquinas con alta capacidad de memoria tienen más memoria por CPU virtual que los tipos de máquinas estándar. El uso de un tipo de máquina con alta capacidad de memoria aumenta la memoria disponible para cada trabajador y la memoria disponible por subproceso, ya que la cantidad de CPU virtuales sigue siendo la misma. Como resultado, usar un tipo de máquina con alta capacidad de memoria puede ser una forma rentable de seleccionar un trabajador con más memoria por CPU virtual.
- Para obtener más flexibilidad cuando se especifica la cantidad de CPU virtuales y de memoria, puedes usar un tipo personalizado de máquina. Con los tipos personalizados de máquinas, puedes aumentar la memoria en incrementos de 256 MB. El precio de estos tipos de máquinas es diferente al de los tipos estándar.
- Algunas familias de máquinas te permiten usar tipos personalizados de máquinas con memoria extendida. La memoria extendida habilita una proporción más alta de memoria por CPU virtual. El costo es más alto.
Para establecer tipos de trabajadores, usa la siguiente opción de canalización. Si deseas obtener más información, consulta Configura las opciones de canalización y Opciones de canalización.
Java
Usa la opción de canalización --workerMachineType
.
Python
Usa la opción de canalización --machine_type
.
Go
Usa la opción de canalización --worker_machine_type
.
Usa un tipo de máquina que tenga más CPU virtuales
Esta opción solo se recomienda para canalizaciones de transmisión de Java y Go. Los tipos de máquinas con más CPU virtuales tienen más memoria total, ya que la cantidad de memoria se escala de forma lineal con la cantidad de CPU virtuales. Por ejemplo, un tipo de máquina n1-standard-4
con cuatro CPU virtuales tiene 15 GB de memoria. Un tipo de máquina n1-standard-8
con ocho CPU virtuales tiene 30 GB de memoria. Para obtener más información sobre los tipos predefinidos de máquinas, consulta Familia de máquinas de uso general.
El uso de trabajadores con una mayor cantidad de CPU virtuales puede aumentar el costo de tu canalización de manera significativa. Sin embargo, puedes usar el ajuste de escala automático horizontal para reducir el número total de trabajadores a fin de que el paralelismo permanezca igual. Por ejemplo, si tienes 50 trabajadores que usan un tipo de máquina n1-standard-4
y cambias a un tipo de máquina n1-standard-8
, puedes usar el ajuste de escala automático horizontal y establecer la cantidad máxima de trabajadores para reducir la La cantidad total de trabajadores en tu canalización a alrededor de 25. Esta configuración da como resultado una canalización con un costo similar.
Para establecer la cantidad máxima de trabajadores, usa la siguiente opción de canalización.
Java
Usa la opción de canalización --maxNumWorkers
.
Para obtener más información, consulta Opciones de canalización.
Go
Usa la opción de canalización --max_num_workers
.
Para obtener más información, consulta Opciones de canalización.
No se recomienda este método para las canalizaciones de Python. Cuando usas el SDK de Python, si cambias a un trabajador con una mayor cantidad de CPU virtuales, no solo aumentas la memoria, sino que también aumentas la cantidad de procesos del SDK de Apache Beam. Por ejemplo, el tipo de máquina n1-standard-4
tiene la misma memoria por subproceso que el tipo de máquina n1-standard-8
para las canalizaciones de Python. Por lo tanto, con las canalizaciones de Python, la recomendación es usar un tipo de máquina con alta capacidad de memoria, reducir la cantidad de subprocesos o usar solo un proceso del SDK de Apache Beam.
Reduce la cantidad de subprocesos
Si usar un tipo de máquina con alta capacidad de memoria no resuelve tu problema, aumenta la memoria disponible por subproceso mediante la reducción de la cantidad máxima de subprocesos que ejecutan instancias DoFn
.
Este cambio reduce el paralelismo. Para reducir la cantidad de subprocesos del SDK de Apache Beam que ejecutan instancias DoFn
, usa la siguiente opción de canalización.
Java
Usa la opción de canalización --numberOfWorkerHarnessThreads
.
Para obtener más información, consulta Opciones de canalización.
Python
Usa la opción de canalización --number_of_worker_harness_threads
.
Para obtener más información, consulta Opciones de canalización.
Go
Usa la opción de canalización --number_of_worker_harness_threads
.
Para obtener más información, consulta Opciones de canalización.
Para reducir la cantidad de subprocesos de las canalizaciones por lotes de Java y Go, configura el valor de la marca en un número que sea menor que la cantidad de CPU virtuales en el trabajador. Para las canalizaciones de transmisión, establece el valor de la marca en un número menor que la cantidad de subprocesos por proceso del SDK de Apache Beam.
Para estimar los subprocesos por proceso, consulta la tabla en la sección Uso de memoria DoFn
de esta página.
Esta personalización no está disponible para las canalizaciones de Python que se ejecutan en el SDK 2.20.0 o versiones anteriores de Apache Beam o para las canalizaciones de Python que no usan Runner v2.
Usa solo un proceso del SDK de Apache Beam
Para las canalizaciones de transmisión de Python y las canalizaciones de Python que usan Runner v2, puedes forzar a Dataflow a que inicie solo un proceso del SDK de Apache Beam por trabajador. Antes de probar esta opción, intenta resolver el problema con los otros métodos. Si deseas configurar las VMs de trabajador de Dataflow para que inicien solo un proceso en Python en contenedor, usa la siguiente opción de canalización:
--experiments=no_use_multiple_sdk_containers
Con esta configuración, las canalizaciones de Python crean un proceso del SDK de Apache Beam por trabajador. Esta configuración evita que los objetos y datos compartidos se repliquen varias veces para cada proceso del SDK de Apache Beam. Sin embargo, limita el uso eficiente de los recursos de procesamiento disponibles en el trabajador.
Reducir la cantidad de procesos del SDK de Apache Beam a uno no siempre reduce la cantidad total de subprocesos iniciados en el trabajador. Además, tener todos los subprocesos en un solo proceso del SDK de Apache Beam puede hacer que el procesamiento lento o la canalización se detenga. Por lo tanto, es posible que también debas reducir la cantidad de subprocesos, como se describe en la sección Reduce la cantidad de subprocesos en esta página.
También puedes forzar a los trabajadores a usar solo un proceso del SDK de Apache Beam mediante el uso de un tipo de máquina con solo una CPU virtual.