Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Esta página te guiará en la creación de una arquitectura push basada en eventos activar DAG de Cloud Composer en respuesta a Pub/Sub cambios de tema. Los ejemplos de este instructivo demuestran cómo controlar el ciclo completo de la administración de Pub/Sub, incluida la administración de suscripciones, como parte del proceso de DAG. Es adecuado para algunos de los casos de uso comunes en los que deben activar DAG, pero no quieren configurar permisos de acceso adicionales.
Por ejemplo, los mensajes enviados a través de Pub/Sub pueden usarse como una solución si no quieres proporcionar acceso directo a un Cloud Composer el entorno por motivos de seguridad. Puedes configurar un Cloud Function que crea mensajes de Pub/Sub y y las publica en un tema de Pub/Sub. Luego, puedes crear un DAG extrae los mensajes de Pub/Sub y, luego, los administra.
En este ejemplo específico, se crea una Cloud Function y se implementa dos DAG. El primer DAG extrae mensajes de Pub/Sub y activa segundo DAG según el contenido del mensaje de Pub/Sub.
En este instructivo, se supone que estás familiarizado con Python y la consola de Google Cloud.
Objetivos
Costos
En este instructivo, se usan los siguientes componentes facturables de Google Cloud:
- Cloud Composer (también consulta costos adicionales)
- Pub/Sub
- Cloud Functions
Cuando finalices este instructivo, para evitar que se te siga facturando, borra los recursos que creaste. Consulta la sección Limpieza para obtener más detalles.
Antes de comenzar
Para este instructivo, necesitas Google Cloud proyecto. Configura el proyecto de la siguiente manera:
En la consola de Google Cloud, selecciona o crea un proyecto:
Asegúrate de tener habilitada la facturación para tu proyecto. Obtén más información para verificar si la facturación está habilitada en un proyecto.
Asegúrate de que el usuario de tu proyecto de Google Cloud tenga los siguientes roles para crear los recursos necesarios:
- Usuario de cuenta de servicio (
roles/iam.serviceAccountUser
) - Editor de Pub/Sub (
roles/pubsub.editor
) - Administrador de objetos de almacenamiento y entorno
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrador de Cloud Functions (
roles/cloudfunctions.admin
) - Visualizador de registros (
roles/logging.viewer
)
- Usuario de cuenta de servicio (
Asegúrate de que la cuenta de servicio que ejecuta tu Cloud Function tenga permisos suficientes en tu proyecto para acceder a Pub/Sub. De de forma predeterminada, Cloud Functions usa Cuenta de servicio predeterminada de App Engine. Esta cuenta de servicio tiene el rol de Editor, que es suficiente permisos para este instructivo.
Habilita las API para tu proyecto.
Console
Enable the Cloud Composer, Cloud Functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Para habilitar la API de Cloud Composer en tu proyecto, agrega lo siguiente: las definiciones de recursos a tu secuencia de comandos de Terraform:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Reemplaza <PROJECT_ID>
por Project ID.
de tu proyecto. Por ejemplo, example-project
.
Crea tu entorno de Cloud Composer
Crea un entorno de Cloud Composer 2.
Como parte de este procedimiento,
otorgas la extensión de agente de servicio de la API de Cloud Composer v2
(roles/composer.ServiceAgentV2Ext
) para el agente de servicio de Composer
de servicio predeterminada. Cloud Composer usa esta cuenta para realizar operaciones
en tu proyecto de Google Cloud.
Crea un tema de Pub/Sub
En este ejemplo, se activa un DAG en respuesta a un mensaje enviado a un Tema de Pub/Sub. Crear un tema de Pub/Sub para usar en esta ejemplo:
Console
En la consola de Google Cloud, ve a la página Temas de Pub/Sub.
Haz clic en Crear tema.
En el campo ID del tema, ingresa
dag-topic-trigger
como un ID para tu en el tema.Deja las demás opciones en sus valores predeterminados.
Haz clic en Crear tema.
gcloud
Para crear un tema, ejecuta gcloud pubsub topics create en Google Cloud CLI:
gcloud pubsub topics create dag-topic-trigger
Terraform
Agrega las siguientes definiciones de recursos a tu secuencia de comandos de Terraform:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Reemplaza <PROJECT_ID>
por Project ID.
de tu proyecto. Por ejemplo, example-project
.
Sube tus DAG
Sube DAG a tu entorno:
- Guarda el siguiente archivo de DAG en tu computadora local.
- Reemplaza
<PROJECT_ID>
por Project ID. de tu proyecto. Por ejemplo,example-project
. - Sube el archivo DAG editado a tu entorno.
El código de muestra contiene dos DAG: trigger_dag
y target_dag
.
El DAG trigger_dag
se suscribe a un tema de Pub/Sub, extrae
Mensajes de Pub/Sub y activa otro DAG especificado en el ID del DAG
de los datos del mensaje de Pub/Sub. En este ejemplo, trigger_dag
activa
El DAG target_dag
, que envía mensajes a los registros de tareas.
El DAG trigger_dag
contiene las siguientes tareas:
subscribe_task
: Suscribirte a un tema de Pub/Subpull_messages_operator
: Lee los datos de un mensaje de Pub/Sub conPubSubPullOperator
.trigger_target_dag
: Activa otro DAG (en este ejemplo,target_dag
). según los datos de los mensajes extraídos en el tema.
El DAG target_dag
contiene solo una tarea: output_to_logs
. Esta tarea
imprime mensajes en el registro de tareas con un segundo de demora.
Implementa una Cloud Function que publique mensajes en un tema de Pub/Sub
En esta sección, implementarás una Cloud Function que publica mensajes en un tema de Pub/Sub.
Crea una Cloud Function y especifica su configuración
Console
En la consola de Google Cloud, ve a la página Cloud Functions.
Haz clic en Crear función.
En el campo Entorno, selecciona 1st gen.
En el campo Nombre de la función, ingresa el nombre de tu función:
pubsub-publisher
En el campo Activador, selecciona HTTP.
En la sección Autenticación, selecciona Permite invocaciones no autenticadas. Esta opción otorga usuarios no autenticados la capacidad de invocar una función de HTTP.
Haz clic en Guardar.
Haz clic en Siguiente para avanzar al paso Código.
Terraform
Considera usar la consola de Google Cloud para este paso, y directa de administrar el código fuente de la función desde Terraform.
En este ejemplo, se muestra cómo subir una Cloud Function desde un archivo ZIP local creando un bucket de Cloud Storage. almacenar el archivo en este bucket y, luego, usar el archivo del bucket como fuente de la Cloud Function. Si usas este enfoque, Terraform no actualiza automáticamente el código fuente de tu función, incluso si creas un nuevo archivo. Para volver a subir el código de la función, puede cambiar el nombre del archivo.
- Descarga el
pubsub_publisher.py
y lasrequirements.txt
archivos. - En el archivo
pubsub_publisher.py
, reemplaza<PROJECT_ID>
por Es el ID del proyecto. Por ejemplo,example-project
. - Crea un archivo ZIP llamado
pubsub_function.zip
con elpbusub_publisner.py
y el archivorequirements.txt
. - Guarda el archivo ZIP en un directorio donde esté almacenada la secuencia de comandos de Terraform.
- Agrega las siguientes definiciones de recursos a tu secuencia de comandos de Terraform y
Reemplaza
<PROJECT_ID>
por el ID de tu proyecto.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Especifica los parámetros de código de la Cloud Function
Console
En el paso Código, en el campo Entorno de ejecución, selecciona el lenguaje. el entorno de ejecución que usa tu función. En este ejemplo, selecciona Python 3.10.
En el campo Punto de entrada, ingresa
pubsub_publisher
. Este es el código que se ejecuta cuando se ejecuta tu Cloud Function. El valor de este marcador debe ser un nombre de función o un nombre de clase completamente calificado que que existe en tu código fuente.
Terraform
Omite este paso. Los parámetros de Cloud Function ya están definidos en
el recurso google_cloudfunctions_function
Sube el código de tu Cloud Function
Console
En el campo Código fuente, selecciona la opción que corresponda según cómo deseas proporciona el código fuente de la función. En este instructivo, agrega el código de tu función con Cloud Functions Editor intercalado. Como alternativa, puedes subir un archivo ZIP o usar Cloud Source Repositories.
- Ingresa el siguiente ejemplo de código en el archivo main.py.
- Reemplaza
<PROJECT_ID>
por Project ID. de tu proyecto. Por ejemplo,example-project
.
Terraform
Omite este paso. Los parámetros de Cloud Function ya están definidos en
el recurso google_cloudfunctions_function
Especifica tus dependencias de Cloud Function
Console
Especifica las dependencias de la función en el archivo de metadatos requirements.txt:
Cuando implementas tu función, Cloud Functions descarga e instala
dependencias declaradas en el archivo requirements.txt, una línea por paquete.
Este archivo debe estar en el mismo directorio que el archivo main.py que contiene
el código de tu función. Para obtener más detalles, consulta
Archivos de requisitos
en la documentación de pip
.
Terraform
Omite este paso. Las dependencias de Cloud Function se definen en
el archivo requirements.txt
en el archivo pubsub_function.zip
.
Implementa tu Cloud Function
Console
Haga clic en Implementar. Cuando la implementación finaliza correctamente, aparece la función con una marca de verificación verde en la página Cloud Functions Consola de Google Cloud
Asegúrate de que la cuenta de servicio que ejecuta tu Cloud Function tenga los permisos suficientes en tu proyecto para acceder Pub/Sub
Terraform
Inicializa Terraform mediante este comando:
terraform init
Revisa la configuración y verifica que los recursos de Terraform que crearás o actualizarás en función de tus expectativas:
terraform plan
Para comprobar si tu configuración es válida, ejecuta el siguiente comando: :
terraform validate
Para aplicar la configuración de Terraform, ejecuta el siguiente comando: ingresando “yes” en el prompt:
terraform apply
Espera hasta que Terraform muestre el mensaje “Apply complete!”.
En la consola de Google Cloud, navega a tus recursos en la IU para hacer de que Terraform las creó o actualizó.
Prueba tu función de Cloud Functions
Para verificar que tu función publique un mensaje en un tema de Pub/Sub y que los DAG de ejemplo funcionan según lo previsto:
Verifica que los DAG estén activos:
En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña DAG.
Verifica los valores en la columna Estado para los DAG llamados
trigger_dag
ytarget_dag
Ambos DAG deben tener el estadoActive
.
Envía un mensaje de prueba de Pub/Sub. Puedes hacerlo en Cloud Shell:
En la consola de Google Cloud, ve a la página Funciones.
Haz clic en el nombre de tu función,
pubsub-publisher
.Ve a la pestaña Pruebas.
En la sección Configure activation event, ingresa lo siguiente Par clave-valor JSON:
{"message": "target_dag"}
. No modifiques el par clave-valor porque este mensaje activa el DAG de prueba más tarde.En la sección Comando de prueba, haz clic en Probar en Cloud Shell.
En la terminal de Cloud Shell, espera hasta que aparezca un comando. automáticamente. Para ejecutar este comando, presiona
Enter
.Si aparece el mensaje Autoriza Cloud Shell, Haz clic en Autorizar.
Comprueba que el contenido del mensaje corresponda a Pub/Sub mensaje. En este ejemplo, el mensaje de salida debe comenzar con
Message b'target_dag' with message_length 10 published to
como respuesta de tu función.
Verifica que se haya activado
target_dag
:Espera al menos un minuto para que una nueva ejecución de DAG de
trigger_dag
de datos completados.En la consola de Google Cloud, ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña DAG.
Haz clic en
trigger_dag
para ir a la página Detalles de DAG. En Runs, se mostrará una lista de ejecuciones de DAG para el DAGtrigger_dag
.Este DAG se ejecuta cada minuto y procesa todos los datos de Pub/Sub mensajes enviados desde la función. Si no se enviaron mensajes, el La tarea
trigger_target
está marcada comoSkipped
en los registros de ejecución del DAG. Si Se activaron los DAG, luego, la tareatrigger_target
se marcó comoSuccess
Revisa varias ejecuciones recientes de DAG para ubicar una ejecución de DAG en la que tres tareas (
subscribe_task
,pull_messages_operator
ytrigger_target
) están en el estadoSuccess
.Regresa a la pestaña DAGs y verifica que las ejecuciones de ejecuciones correctas para el DAG
target_dag
indica una ejecución correcta.
Resumen
En este instructivo, aprendiste a usar Cloud Functions para publicar mensajes en un tema de Pub/Sub e implementarás un DAG que se suscriba a un Tema de Pub/Sub, extraer mensajes de Pub/Sub y activadores otro DAG especificado en el ID del DAG de los datos del mensaje.
También hay formas alternativas de crear y administrar suscripciones a Pub/Sub y activar DAG que están fuera del alcance de este instructivo. Por ejemplo, puedes Usa Cloud Functions para activar los DAG de Airflow cuando ocurre un evento específico. Consulta nuestros instructivos para probar las otras funciones funciones de Google Cloud para ti mismo.
Limpia
Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conservarlo y borrar los recursos individuales.
Borra el proyecto
Borra un proyecto de Google Cloud:
gcloud projects delete PROJECT_ID
Borra los recursos individuales
Si planeas explorar varios instructivos y guías de inicio rápido, la reutilización de proyectos puede ayudarte a evitar exceder los límites de las cuotas del proyecto.
Console
- Borra el entorno de Cloud Composer. También borrar el bucket del entorno durante el procedimiento.
- Borra el tema de Pub/Sub,
dag-topic-trigger
. Borra la Cloud Function.
En la consola de Google Cloud, ve a Cloud Functions.
Haz clic en la casilla de verificación de la función que deseas borrar
pubsub-publisher
Haz clic en Borrar y, luego, sigue las instrucciones.
Terraform
- Asegúrate de que tu secuencia de comandos de Terraform no contenga entradas para recursos que aún necesita tu proyecto. Por ejemplo, es posible que quieras mantener algunas APIs habilitadas y,IAM permisos aún asignados (si agregaste esas definiciones a tu de Terraform).
- Ejecuta
terraform destroy
. - Borra el bucket del entorno de forma manual. Cloud Composer no lo borra automáticamente. Puedes hacerlo desde la consola de Google Cloud o Google Cloud CLI.
¿Qué sigue?
- Prueba los DAG
- Prueba funciones de HTTP
- Implementa una Cloud Function
- Prueba otras funciones de Google Cloud. Echa un vistazo nuestros instructivos.