Para detener un trabajo de Dataflow, puedes usar la Google Cloud consola, Cloud Shell, un terminal local instalado con la CLI de Google Cloud o la API REST de Dataflow.
Puedes detener una tarea de Dataflow de tres formas:
Cancela una tarea. Este método se aplica tanto a las canalizaciones de streaming como a las canalizaciones por lotes. Si cancelas una tarea, el servicio Dataflow dejará de procesar los datos, incluidos los almacenados en búfer. Para obtener más información, consulta Cancelar un trabajo.
Drena una tarea. Este método solo se aplica a las canalizaciones de streaming. Al drenar una tarea, el servicio Dataflow puede terminar de procesar los datos almacenados en búfer y, al mismo tiempo, dejar de ingerir datos nuevos. Para obtener más información, consulta Drenar un trabajo.
Fuerza la cancelación de una tarea. Este método se aplica tanto a las canalizaciones de streaming como a las canalizaciones por lotes. Si cancelas una tarea de forma forzada, el servicio Dataflow dejará de procesar datos inmediatamente, incluidos los datos almacenados en búfer. Antes de forzar la cancelación, debes intentar cancelar la suscripción de forma normal. La cancelación forzada solo se debe usar en tareas que se hayan quedado bloqueadas en el proceso de cancelación normal. Para obtener más información, consulta Forzar la cancelación de un trabajo.
Si cancelas un trabajo, no podrás volver a iniciarlo. Si no usas plantillas flexibles, puedes clonar la canalización cancelada e iniciar un nuevo trabajo a partir de la canalización clonada.
Antes de detener una canalización de streaming, considera la posibilidad de crear una captura del trabajo. Las capturas de Dataflow guardan el estado de un flujo de procesamiento de streaming, por lo que puedes iniciar una nueva versión de tu trabajo de Dataflow sin perder el estado. Para obtener más información, consulta el artículo sobre cómo utilizar capturas de Dataflow.
Si tienes una canalización compleja, te recomendamos que crees una plantilla y ejecutes la tarea a partir de ella.
No puedes eliminar tareas de Dataflow, pero sí puedes archivar las tareas completadas. Todos los trabajos completados, incluidos los de la lista de trabajos archivados, se eliminan tras un periodo de conservación de 30 días.
Cancelar una tarea de Dataflow
Cuando cancelas un trabajo, el servicio Dataflow lo detiene inmediatamente.
Cuando cancelas un trabajo, ocurre lo siguiente:
El servicio Dataflow detiene todas las operaciones de ingestión y procesamiento de datos.
El servicio Dataflow empieza a limpiar los recursos de Google Cloud Platform que están asociados a tu trabajo.
Estos recursos pueden incluir el cierre de instancias de trabajador de Compute Engine y de conexiones activas a fuentes o receptores de E/S.
Información importante sobre la cancelación de un trabajo
Si cancelas una tarea, se detendrá inmediatamente el procesamiento de la canalización.
Si cancelas un trabajo, es posible que pierdas los datos en curso. Los datos en tránsito son los que ya se han leído, pero que el flujo de procesamiento aún está procesando.
Es posible que se pueda seguir accediendo a los datos que se hayan escrito desde la canalización en un receptor de salida antes de cancelar el trabajo.
Si no te preocupa la pérdida de datos, cancelar el trabajo asegura que los recursos deGoogle Cloud asociados a él se cierren lo antes posible.
Finalizar una tarea de Dataflow
Cuando agotas una tarea, el servicio Dataflow la finaliza en su estado actual. Si quieres evitar la pérdida de datos al detener tus pipelines de streaming, la mejor opción es purgar tu trabajo.
Cuando agotas una tarea, se producen las siguientes acciones:
Tu trabajo dejará de ingerir datos nuevos de las fuentes de entrada poco después de recibir la solicitud de purga (normalmente, en unos minutos).
El servicio Dataflow conserva los recursos que ya haya, como las instancias de trabajador, para terminar de procesar y escribir los datos almacenados en búfer de tu flujo de procesamiento.
Cuando se completan todas las operaciones de escritura y procesamiento pendientes, el servicio Dataflow cierra los Google Cloud recursos Google Cloud asociados a tu trabajo.
Para vaciar un trabajo, Dataflow deja de leer nuevas entradas, marca la fuente con una marca de tiempo de evento infinita y, a continuación, propaga las marcas de tiempo infinitas a través de la canalización. Por lo tanto, es posible que las canalizaciones en proceso de drenaje tengan una marca de agua infinita.
Información importante sobre el drenaje de una tarea
No se admite la opción de desviar una tarea en las canalizaciones por lotes.
Tu flujo de procesamiento seguirá incurriendo en el coste de mantener los recursos deGoogle Cloud asociados hasta que se completen todos los procesos y la escritura.
Puedes actualizar una canalización que se esté vaciando. Si tu canalización está bloqueada, actualízala con el código que solucione el error que está causando el problema para que se vacíe correctamente sin perder datos.
Puedes cancelar una tarea que esté consumiendo recursos.
El drenaje de un trabajo puede tardar mucho tiempo en completarse, por ejemplo, cuando tu canalización tiene una gran cantidad de datos almacenados en búfer.
Si tu canalización de streaming incluye un DoFn divisible, debes truncar el resultado antes de ejecutar la opción de drenaje. Para obtener más información sobre cómo truncar Splittable DoFns, consulta la documentación de Apache Beam.
En algunos casos, es posible que una tarea de Dataflow no pueda completar la operación de purga. Puedes consultar los registros de los trabajos para determinar la causa principal y tomar las medidas oportunas.
Conservación de datos
El streaming de Dataflow tolera que los trabajadores se reinicien y no falla en las tareas de streaming cuando se producen errores. En su lugar, el servicio Dataflow vuelve a intentar la operación hasta que tomes una medida, como cancelar o reiniciar el trabajo. Cuando agotas la tarea, Dataflow sigue reintentándolo, lo que puede provocar que las canalizaciones se queden bloqueadas. En esta situación, para habilitar un drenaje correcto sin perder datos, actualiza la canalización con el código que solucione el error que está causando el problema.
Dataflow no confirma los mensajes hasta que el servicio de Dataflow los confirma de forma duradera. Por ejemplo, con Kafka, puedes ver este proceso como una transferencia segura de la propiedad del mensaje de Kafka a Dataflow, lo que elimina el riesgo de pérdida de datos.
Tareas bloqueadas
- El drenaje no corrige los flujos de procesamiento atascados. Si se bloquea el movimiento de datos, la canalización se quedará bloqueada después del comando de purga. Para solucionar un problema con una canalización, usa el comando update para actualizar la canalización con el código que resuelve el error que está causando el problema. También puedes cancelar las tareas que se hayan quedado bloqueadas, pero esto podría provocar la pérdida de datos.
Temporizadores
Si el código de tu canalización de streaming incluye un temporizador en bucle, el trabajo puede ser lento o no poder completarse. Como el drenaje no finaliza hasta que se completan todos los temporizadores, las canalizaciones con temporizadores de bucle infinito nunca finalizan el drenaje.
Dataflow espera a que se completen todos los temporizadores de tiempo de procesamiento en lugar de activarlos de inmediato, lo que puede provocar que los drenajes sean lentos.
Efectos de drenar una tarea
Cuando agotas un flujo de procesamiento en streaming, Dataflow cierra inmediatamente las ventanas en curso y activa todos los activadores.
El sistema no espera a que finalicen las ventanas basadas en el tiempo pendientes en una operación de drenaje.
Por ejemplo, si tu canalización lleva diez minutos de un periodo de dos horas cuando vacías el trabajo, Dataflow no espera a que termine el resto del periodo. Cierra la ventana inmediatamente con resultados parciales. Dataflow hace que las ventanas abiertas se cierren al avanzar la marca de agua de los datos hasta el infinito. Esta función también se puede usar con fuentes de datos personalizadas.
Cuando se vacía una canalización que usa una clase de fuente de datos personalizada, Dataflow deja de enviar solicitudes de datos nuevos, avanza la marca de agua de los datos hasta el infinito y llama al método finalize()
de la fuente en el último punto de control.
Si se agota, las ventanas pueden llenarse parcialmente. En ese caso, si reinicias la canalización agotada, es posible que la misma ventana se active una segunda vez, lo que puede provocar problemas con tus datos. Por ejemplo, en el siguiente caso, los archivos pueden tener nombres conflictivos y los datos pueden sobrescribirse:
Si agotas una canalización con ventanas por horas a las 12:34, la ventana de las 12:00 a las 13:00 se cierra solo con los datos que se activaron durante los primeros 34 minutos de la ventana. El flujo de procesamiento no lee datos nuevos después de las 12:34.
Si reinicias el flujo de procesamiento inmediatamente, se volverá a activar el periodo de 12:00 a 13:00, pero solo con los datos que se hayan leído entre las 12:35 y las 13:00. No se envían duplicados, pero si se repite un nombre de archivo, los datos se sobrescriben.
En la consola de Google Cloud , puedes ver los detalles de las transformaciones de tu canalización. En el siguiente diagrama se muestran los efectos de una operación de drenaje en proceso. Nota: La marca de agua se adelanta al valor máximo.
Imagen 1. Vista de pasos de una operación de drenaje.
Forzar la cancelación de una tarea de Dataflow
Usa la cancelación forzada solo cuando no puedas cancelar el trabajo con otros métodos. Si cancelas una tarea de forma forzada, se finalizará sin limpiar todos los recursos. Si usas la cancelación forzada repetidamente, es posible que se acumulen recursos con fugas, y estos recursos consumen tu cuota.
Cuando cancelas una tarea de forma forzada, el servicio Dataflow la detiene inmediatamente y se pierden las VMs que haya creado. La cancelación normal debe intentarse al menos 30 minutos antes de la cancelación forzada.
Cuando fuerzas la cancelación de un trabajo, ocurren las siguientes acciones:
- El servicio Dataflow detiene todas las operaciones de ingestión y procesamiento de datos.
Información importante sobre la cancelación forzosa de un trabajo
Si cancelas una tarea de forma forzada, se detendrá inmediatamente el procesamiento de la canalización.
La cancelación forzada de una tarea solo se debe usar en tareas que se hayan quedado bloqueadas en el proceso de cancelación normal.
No se liberan necesariamente las instancias de trabajador que haya creado la tarea de Dataflow, lo que puede provocar fugas de instancias de trabajador. Las instancias de trabajador filtradas no contribuyen a los costes de los trabajos, pero pueden usar tu cuota. Una vez que se haya cancelado el trabajo, puedes eliminar estos recursos.
En el caso de las tareas de Dataflow Prime, no puedes ver ni eliminar las VMs filtradas. En la mayoría de los casos, estas máquinas virtuales no crean problemas. Sin embargo, si las VMs filtradas causan problemas, como consumir tu cuota de VMs, ponte en contacto con el equipo de Asistencia.
Detener una tarea de Dataflow
Antes de detener un trabajo, debes conocer los efectos de cancelar, agotar o cancelar por la fuerza un trabajo.
Consola
Ve a la página Tareas de Dataflow.
Haz clic en el trabajo que quieras detener.
Para detener una tarea, su estado debe ser En curso.
En la página de detalles del trabajo, haz clic en Detener.
Elige una de estas opciones:
En el caso de una canalización por lotes, haz clic en Cancelar o en Forzar cancelación.
En el caso de una canalización de streaming, haz clic en Cancelar, Drenar o Forzar cancelación.
Para confirmar tu elección, haz clic en Detener trabajo.
gcloud
Para purgar o cancelar una tarea de Dataflow, puedes usar el comando gcloud dataflow jobs
en Cloud Shell o en un terminal local en el que esté instalada la CLI de gcloud.
Inicia sesión en tu shell.
Indica los IDs de las tareas de Dataflow que se están ejecutando y anota el ID de la tarea que quieras detener:
gcloud dataflow jobs list
Si no se define la marca
--region
, se mostrarán las tareas de Dataflow de todas las regiones disponibles.Elige una de estas opciones:
Para vaciar un trabajo de streaming, sigue estos pasos:
gcloud dataflow jobs drain JOB_ID
Sustituye
JOB_ID
por el ID de tarea que has copiado anteriormente.Para cancelar una tarea por lotes o de streaming, sigue estos pasos:
gcloud dataflow jobs cancel JOB_ID
Sustituye
JOB_ID
por el ID de tarea que has copiado anteriormente.Para forzar la cancelación de una tarea por lotes o de streaming, sigue estos pasos:
gcloud dataflow jobs cancel JOB_ID --force
Sustituye
JOB_ID
por el ID de tarea que has copiado anteriormente.
API
Para cancelar o vaciar una tarea mediante la API REST de Dataflow, puedes elegir entre projects.locations.jobs.update
o projects.jobs.update
.
En el cuerpo de la solicitud, escribe el estado de la tarea necesario en el campo requestedState
de la instancia de la tarea de la API elegida.
Importante: Te recomendamos que uses projects.locations.jobs.update
, ya que projects.jobs.update
solo permite actualizar el estado de los trabajos que se ejecutan en us-central1
.
Para cancelar el trabajo, asigna el estado
JOB_STATE_CANCELLED
.Para agotar la tarea, cambia su estado a
JOB_STATE_DRAINED
.Para forzar la cancelación del trabajo, asigna el estado
JOB_STATE_CANCELLED
con la etiqueta"force_cancel_job": "true"
. El cuerpo de la solicitud es el siguiente:{ "requestedState": "JOB_STATE_CANCELLED", "labels": { "force_cancel_job": "true" } }
Detectar la finalización de una tarea de Dataflow
Para detectar cuándo se ha completado la cancelación o el drenaje del trabajo, usa uno de los siguientes métodos:
- Usa un servicio de orquestación de flujos de trabajo, como Cloud Composer, para monitorizar tu tarea de Dataflow.
- Ejecuta la canalización de forma síncrona para que las tareas se bloqueen hasta que se complete la canalización. Para obtener más información, consulta Controlar los modos de ejecución en Configurar opciones de la canalización.
Usa la herramienta de línea de comandos de la CLI de Google Cloud para sondear el estado del trabajo. Para obtener una lista de todos los trabajos de Dataflow de tu proyecto, ejecuta el siguiente comando en tu shell o terminal:
gcloud dataflow jobs list
El resultado muestra el ID, el nombre, el estado (
STATE
) y otra información de cada trabajo. Para obtener más información, consulta Usar la CLI de Google Cloud para enumerar trabajos.
Archivar tareas de Dataflow completadas
Cuando archivas una tarea de Dataflow, se elimina de la lista de tareas de la página Tareas de Dataflow en la consola. El trabajo se mueve a una lista de trabajos archivados. Solo puedes archivar los trabajos completados, incluidos los que tengan los siguientes estados:
JOB_STATE_CANCELLED
JOB_STATE_DRAINED
JOB_STATE_DONE
JOB_STATE_FAILED
JOB_STATE_UPDATED
Para obtener más información sobre cómo verificar estos estados, consulta el artículo Detectar la finalización de una tarea de Dataflow.
Para obtener información sobre cómo solucionar problemas al archivar trabajos, consulta la sección Errores al archivar trabajos del artículo "Solucionar errores de Dataflow".
Todos los trabajos archivados se eliminan tras un periodo de conservación de 30 días.
Archivar una tarea
Sigue estos pasos para quitar una tarea completada de la lista principal de tareas de la página Tareas de Dataflow.
Consola
En la Google Cloud consola, ve a la página Trabajos de Dataflow.
Aparecerá una lista de tareas de Dataflow junto con su estado.
Selecciona un trabajo.
En la página Detalles del trabajo, haga clic en Archivar. Si la tarea no se ha completado, la opción Archivar no estará disponible.
REST
Para archivar un trabajo mediante la API, usa el método projects.locations.jobs.update
.
En esta solicitud, debes especificar un objeto JobMetadata
actualizado. En el objeto JobMetadata.userDisplayProperties
, usa el par clave-valor "archived":"true"
.
Además del objeto JobMetadata
actualizado, tu solicitud de la API también debe incluir el parámetro de consulta updateMask en la URL de la solicitud:
https://dataflow.googleapis.com/v1b3/[...]/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived
Antes de usar los datos de la solicitud, haz las siguientes sustituciones:
- PROJECT_ID: tu ID de proyecto
- REGION: una región de Dataflow
- JOB_ID: el ID de tu tarea de Dataflow
Método HTTP y URL:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived
Cuerpo JSON de la solicitud:
{ "job_metadata": { "userDisplayProperties": { "archived": "true" } } }
Para enviar tu solicitud, elige una de estas opciones:
curl
Guarda el cuerpo de la solicitud en un archivo llamado request.json
y ejecuta el siguiente comando:
curl -X PUT \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived"
PowerShell
Guarda el cuerpo de la solicitud en un archivo llamado request.json
y ejecuta el siguiente comando:
$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }
Invoke-WebRequest `
-Method PUT `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" | Select-Object -Expand Content
Deberías recibir una respuesta JSON similar a la siguiente:
{ "id": "JOB_ID", "projectId": "PROJECT_ID", "currentState": "JOB_STATE_DONE", "currentStateTime": "2025-05-20T20:54:41.651442Z", "createTime": "2025-05-20T20:51:06.031248Z", "jobMetadata": { "userDisplayProperties": { "archived": "true" } }, "startTime": "2025-05-20T20:51:06.031248Z" }
gcloud
Este comando archiva un solo trabajo. La tarea debe estar en un estado final para poder archivarse. De lo contrario, se rechazará la solicitud. Después de enviar el comando, debes confirmar que quieres ejecutarlo.
Antes de usar los datos de los comandos que se indican a continuación, haz los siguientes cambios:
JOB_ID
: el ID de la tarea de Dataflow que quieres archivar.REGION
: opcional. La región de Dataflow de tu tarea.
Ejecuta el siguiente comando:
Linux, macOS o Cloud Shell
gcloud dataflow jobs archive JOB_ID --region=REGION_ID
Windows (PowerShell)
gcloud dataflow jobs archive JOB_ID --region=REGION_ID
Windows (cmd.exe)
gcloud dataflow jobs archive JOB_ID --region=REGION_ID
Deberías recibir una respuesta similar a la siguiente:
Archived job [JOB_ID]. createTime: '2025-06-29T11:00:02.432552Z' currentState: JOB_STATE_DONE currentStateTime: '2025-06-29T11:04:25.125921Z' id: JOB_ID jobMetadata: userDisplayProperties: archived: 'true' projectId: PROJECT_ID startTime: '2025-06-29T11:00:02.432552Z'
Ver y restaurar trabajos archivados
Sigue estos pasos para ver las tareas archivadas o para restaurarlas en la lista principal de tareas de la página Tareas de Dataflow.
Consola
En la Google Cloud consola, ve a la página Trabajos de Dataflow.
Haz clic en el interruptor Archivados. Aparecerá una lista de trabajos de Dataflow archivados.
Selecciona un trabajo.
Para restaurar el trabajo en la lista principal de trabajos de la página Trabajos de Dataflow, en la página Detalles del trabajo, haz clic en Restaurar.
REST
Para restaurar un trabajo archivado mediante la API, usa el método projects.locations.jobs.update
.
En esta solicitud, debes especificar un objeto JobMetadata
actualizado. En el objeto JobMetadata.userDisplayProperties
, usa el par clave-valor "archived":"false"
.
Además del objeto JobMetadata
actualizado, tu solicitud de la API también debe incluir el parámetro de consulta updateMask en la URL de la solicitud:
https://dataflow.googleapis.com/v1b3/[...]/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived
Antes de usar los datos de la solicitud, haz las siguientes sustituciones:
- PROJECT_ID: tu ID de proyecto
- REGION: una región de Dataflow
- JOB_ID: el ID de tu tarea de Dataflow
Método HTTP y URL:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived
Cuerpo JSON de la solicitud:
{ "job_metadata": { "userDisplayProperties": { "archived": "false" } } }
Para enviar tu solicitud, elige una de estas opciones:
curl
Guarda el cuerpo de la solicitud en un archivo llamado request.json
y ejecuta el siguiente comando:
curl -X PUT \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived"
PowerShell
Guarda el cuerpo de la solicitud en un archivo llamado request.json
y ejecuta el siguiente comando:
$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }
Invoke-WebRequest `
-Method PUT `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" | Select-Object -Expand Content
Deberías recibir una respuesta JSON similar a la siguiente:
{ "id": "JOB_ID", "projectId": "PROJECT_ID", "currentState": "JOB_STATE_DONE", "currentStateTime": "2025-05-20T20:54:41.651442Z", "createTime": "2025-05-20T20:51:06.031248Z", "jobMetadata": { "userDisplayProperties": { "archived": "false" } }, "startTime": "2025-05-20T20:51:06.031248Z" }
Siguientes pasos
- Consulta la API REST de Dataflow.
- Consulta la interfaz de monitorización de Dataflow en la consola. Google Cloud
- Consulta más información sobre cómo actualizar una canalización.