En este documento se explica cómo hacer lo siguiente en Dataform:
- Programar ejecuciones con configuraciones de flujo de trabajo.
- Programa ejecuciones con Workflows y Cloud Scheduler.
- Programa ejecuciones con Cloud Composer.
Antes de empezar
Para programar ejecuciones con configuraciones de flujo de trabajo o programar ejecuciones con flujos de trabajo y Cloud Scheduler, haz lo siguiente:
En la Google Cloud consola, ve a la página Dataform.
Seleccione o cree un repositorio.
Crea una configuración de lanzamiento.
Para programar ejecuciones con Cloud Composer, haz lo siguiente:
- Seleccione o cree un repositorio de Dataform.
- Concede acceso a Dataform a BigQuery.
- Selecciona o crea un espacio de trabajo de Dataform.
- Crea al menos una tabla.
- Crea un entorno de Cloud Composer 2.
Roles obligatorios
Para obtener los permisos que necesitas para completar las tareas de este documento, pide a tu administrador que te conceda los siguientes roles de IAM:
-
Administrador de Dataform (
roles/dataform.admin
) en repositorios -
Trabajador de Composer (
roles/composer.worker
) en la cuenta de servicio del entorno de Cloud Composer
Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.
También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.
Para usar una cuenta de servicio que no sea la predeterminada de Dataform, concede acceso a la cuenta de servicio personalizada.
Para habilitar las ejecuciones programadas de una configuración de flujo de trabajo cuando el modo estricto de actuar como esté habilitado, debes conceder el permiso iam.serviceAccounts.actAs
a la cuenta de servicio de Dataform de la cuenta de servicio que se use en la configuración del flujo de trabajo. Este permiso está disponible en el rol Usuario de cuenta de servicio (roles/iam.serviceAccountUser
).
Para usar las credenciales de usuario de la cuenta de Google al crear una configuración de flujo de trabajo (Vista previa), concede acceso a la cuenta de Google.
Programar ejecuciones con configuraciones de flujo de trabajo
En esta sección se explica cómo crear una configuración de flujo de trabajo en Dataform para programar y configurar ejecuciones de flujos de trabajo. Puedes usar configuraciones de flujos de trabajo para ejecutar flujos de trabajo de Dataform según una programación.
Acerca de las configuraciones de flujo de trabajo
Para programar ejecuciones de Dataform de todas las acciones de flujo de trabajo o de algunas de ellas en BigQuery, puedes crear configuraciones de flujo de trabajo. En una configuración de flujo de trabajo, se selecciona una configuración de lanzamiento de compilación, se eligen las acciones del flujo de trabajo que se van a ejecutar y se define la programación de ejecución.
Después, durante una ejecución programada de la configuración del flujo de trabajo, Dataform implementa en BigQuery la selección de acciones del resultado de compilación más reciente de la configuración de lanzamiento. También puedes activar manualmente la ejecución de una configuración de flujo de trabajo con la API workflowConfigs de Dataform.
Una configuración de flujo de trabajo de Dataform contiene los siguientes ajustes de ejecución:
- ID de la configuración del flujo de trabajo.
- Configuración de la versión.
Cuenta de servicio.
Esta es la cuenta de servicio asociada a la configuración del flujo de trabajo. Puedes seleccionar la cuenta de servicio de Dataform predeterminada o una cuenta de servicio asociada a tu proyecto de Google Cloud , o bien introducir manualmente otra cuenta de servicio. De forma predeterminada, las configuraciones de flujo de trabajo usan las mismas cuentas de servicio que sus repositorios.
Las credenciales de la cuenta de servicio son el método de autorización predeterminado para crear y ejecutar configuraciones de flujos de trabajo programados.
Credenciales de usuario de la cuenta de Google (vista previa)
Las credenciales de usuario de la cuenta de Google son el método de autorización predeterminado para la creación y las ejecuciones de configuraciones de flujo de trabajo manuales y no programadas. Para obtener más información, consulta el artículo Autorizar tu cuenta de Google.
Acciones del flujo de trabajo que se van a ejecutar:
- Todas las acciones.
- Selección de acciones.
- Selección de etiquetas.
Programación de ejecución y zona horaria.
Crear una configuración de flujo de trabajo
Para crear una configuración de flujo de trabajo de Dataform, sigue estos pasos:
- En tu repositorio, ve a Lanzamientos y programación.
- En la sección Configuraciones de flujo de trabajo, haz clic en Crear.
En el panel Crear configuración de flujo de trabajo, en el campo ID de configuración, introduce un ID único para la configuración del flujo de trabajo.
Los IDs solo pueden incluir números, letras, guiones y guiones bajos.
En el menú Configuración de la versión, selecciona una configuración de la versión de compilación.
En la sección Autenticación, autoriza la configuración del flujo de trabajo con las credenciales de usuario de tu cuenta de Google o con una cuenta de servicio.
- Para usar las credenciales de usuario de tu cuenta de Google (Vista previa), selecciona Ejecutar con mis credenciales de usuario.
- Para usar una cuenta de servicio, selecciona Ejecutar con la cuenta de servicio seleccionada y, a continuación, selecciona la cuenta de servicio de Dataform predeterminada o cualquier cuenta de servicio asociada a tu proyecto deGoogle Cloud al que tengas acceso. Si no seleccionas ninguna cuenta de servicio, la configuración del flujo de trabajo usará la cuenta de servicio del repositorio.
Opcional: En el campo Frecuencia de programación, introduce la frecuencia de las ejecuciones en formato cron de UNIX.
Para verificar que Dataform ejecuta el resultado de compilación más reciente en la configuración de lanzamiento correspondiente, deja un intervalo de al menos una hora entre el momento en que se crea el resultado de compilación y el momento en que se programa la ejecución.
Opcional: En el menú Zona horaria, selecciona la zona horaria de las carreras.
La zona horaria predeterminada es UTC.
Selecciona las acciones del flujo de trabajo que se van a ejecutar:
- Para ejecutar todo el flujo de trabajo, haz clic en Todas las acciones.
- Para ejecutar las acciones seleccionadas en el flujo de trabajo, haz clic en Selección de acciones y, a continuación, selecciona las acciones.
- Para ejecutar acciones con las etiquetas seleccionadas, haga clic en Selección de etiquetas y, a continuación, seleccione las etiquetas.
- Opcional: Para ejecutar las acciones o etiquetas seleccionadas y sus dependencias, selecciona la opción Incluir dependencias.
- Opcional: Para ejecutar las acciones o etiquetas seleccionadas y sus elementos dependientes, selecciona la opción Incluir elementos dependientes.
- Opcional: Para volver a crear todas las tablas desde cero, selecciona la opción Ejecutar con actualización completa.
Si no se usa esta opción, Dataform actualiza las tablas incrementales sin tener que volver a crearlas desde cero.
Haz clic en Crear. Si has seleccionado Ejecutar con mis credenciales de usuario como método de autenticación, debes autorizar tu cuenta de Google (Vista previa).
Por ejemplo, la siguiente configuración de flujo de trabajo ejecuta acciones con la etiqueta hourly
cada hora en la zona horaria CEST:
- ID de configuración:
production-hourly
- Configuración de la versión: -
- Frecuencia:
0 * * * *
- Zona horaria:
Central European Summer Time (CEST)
- Selección de acciones del flujo de trabajo: selección de etiquetas, etiqueta
hourly
Autorizar tu cuenta de Google
Para autenticar el recurso con las credenciales de usuario de tu cuenta de Google, debes conceder manualmente permiso a las canalizaciones de BigQuery para que obtengan el token de acceso de tu cuenta de Google y accedan a los datos de origen en tu nombre. Puedes conceder la aprobación manual con la interfaz del cuadro de diálogo de OAuth.
Solo tienes que dar permiso a las canalizaciones de BigQuery una vez.
Para revocar el permiso que has concedido, sigue estos pasos:
- Ve a la página de tu cuenta de Google.
- Haz clic en BigQuery Pipelines.
- Haz clic en Quitar acceso.
Si se cambian las credenciales para modificar el propietario de la configuración del flujo de trabajo, también se requiere una aprobación manual si el nuevo propietario de la cuenta de Google nunca ha creado una configuración de flujo de trabajo.
Editar una configuración de flujo de trabajo
Para editar una configuración de flujo de trabajo, sigue estos pasos:
- En tu repositorio, ve a Lanzamientos y programación.
- En la configuración del flujo de trabajo que quieras editar, haz clic en el menú Más y, a continuación, en Editar.
- En el panel Editar configuración del flujo de trabajo, edita los ajustes de configuración de la versión y, a continuación, haz clic en Guardar.
Eliminar una configuración de flujo de trabajo
Para eliminar una configuración de flujo de trabajo, sigue estos pasos:
- En tu repositorio, ve a Lanzamientos y programación.
- En la configuración del flujo de trabajo que quieras eliminar, haz clic en el menú Más y, a continuación, en Eliminar.
- En el cuadro de diálogo Eliminar configuración de lanzamiento, haz clic en Eliminar.
Programar ejecuciones con Workflows y Cloud Scheduler
En esta sección se muestra cómo programar ejecuciones de flujos de trabajo de Dataform con Workflows y Cloud Scheduler.
Acerca de las ejecuciones programadas de flujos de trabajo
Puedes definir la frecuencia de las ejecuciones de tu flujo de trabajo de Dataform creando una tarea de Cloud Scheduler que active un flujo de trabajo de Workflows. Workflows ejecuta servicios en un flujo de trabajo de orquestación que defines.
Workflows ejecuta tu flujo de trabajo de Dataform en un proceso de dos pasos. En primer lugar, extrae el código del repositorio de Dataform de tu proveedor de Git y lo compila para obtener un resultado de compilación. Después, usa el resultado de la compilación para crear un flujo de trabajo de Dataform y lo ejecuta con la frecuencia que hayas definido.
Crear un flujo de trabajo de orquestación programado
Para programar ejecuciones de tu flujo de trabajo de Dataform, usa Workflows para crear un flujo de trabajo de orquestación y añade una tarea de Cloud Scheduler como activador.
Workflows usa cuentas de servicio para dar acceso a los workflows a losGoogle Cloud recursos. Crea una cuenta de servicio y concédele el rol de gestión de identidades y accesos Editor de Dataform (
roles/dataform.editor
), así como los permisos mínimos necesarios para gestionar tu flujo de trabajo de orquestación. Para obtener más información, consulta Conceder permiso a un flujo de trabajo para acceder a recursos Google Cloud .Crea un flujo de trabajo de orquestación y usa el siguiente código fuente YAML como definición del flujo de trabajo:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Haz los cambios siguientes:
- PROJECT_ID: el ID de tu proyecto de Google Cloud .
- REPOSITORY_LOCATION: la ubicación de tu repositorio de Dataform.
- REPOSITORY_ID: el nombre de tu repositorio de Dataform.
- GIT_COMMITISH: la rama de Git desde la que quieres ejecutar el código de Dataform. En el caso de un repositorio recién creado, sustitúyelo por
main
.
Programa el flujo de trabajo de orquestación con Cloud Scheduler.
Personalizar la solicitud de resultado de compilación de creación de flujo de trabajo de Dataform
Puedes actualizar el flujo de trabajo de orquestación y definir los ajustes de la solicitud de resultado de compilación de creación del flujo de trabajo de Dataform en formato YAML. Para obtener más información sobre los ajustes, consulta la referencia del recurso REST projects.locations.repositories.compilationResults
.
Por ejemplo, para añadir un ajuste de _dev
schemaSuffix
a todas las acciones durante la compilación, sustituya el cuerpo del paso createCompilationResult
por el siguiente fragmento de código:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
También puedes transmitir ajustes adicionales como argumentos de tiempo de ejecución en una solicitud de ejecución de Workflows y acceder a esos argumentos mediante variables. Para obtener más información, consulta Transferir argumentos del entorno de ejecución en una solicitud de ejecución.
Personalizar la solicitud de invocación del flujo de trabajo de Dataform
Puedes actualizar el flujo de trabajo de orquestación y definir los ajustes de la solicitud de invocación del flujo de trabajo de Dataform en formato YAML. Para obtener más información sobre los ajustes de la solicitud de invocación, consulta la referencia del recurso REST projects.locations.repositories.workflowInvocations
.
Por ejemplo, para ejecutar solo las acciones con la etiqueta hourly
con todas las dependencias transitivas incluidas, sustituye el cuerpo de createWorkflowInvocation
por el siguiente fragmento de código:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
También puedes transmitir ajustes adicionales como argumentos de tiempo de ejecución en una solicitud de ejecución de Workflows y acceder a esos argumentos mediante variables. Para obtener más información, consulta Transferir argumentos del entorno de ejecución en una solicitud de ejecución.
Programar ejecuciones con Cloud Composer
Puedes usar Cloud Composer 2 para programar ejecuciones de Dataform. Dataform no es compatible con Cloud Composer 1.
Para gestionar las programaciones de las ejecuciones de Dataform con Cloud Composer 2, puedes usar operadores de Dataform en grafos acíclicos dirigidos (DAGs) de Airflow. Puedes crear un DAG de Airflow que programe invocaciones de flujos de trabajo de Dataform.
Dataform proporciona varios operadores de Airflow. Entre ellos, se incluyen operadores para obtener un resultado de compilación, obtener una invocación de flujo de trabajo y cancelar una invocación de flujo de trabajo. Para ver la lista completa de operadores de Dataform Airflow disponibles, consulta Operadores de Google Dataform.
Instalar el paquete google-cloud-dataform
de PyPI
Si usas versiones de Cloud Composer 2 2.0.25
o posteriores, este paquete
está preinstalado en tu entorno. No es necesario instalarla.
Si usas versiones anteriores de Cloud Composer 2, instala el paquete google-cloud-dataform
PyPi.
En la sección de paquetes de PyPI, especifica la versión ==0.2.0
.
Crear un DAG de Airflow que programe invocaciones de flujos de trabajo de Dataform
Para gestionar las ejecuciones programadas de los flujos de trabajo de Dataform con Cloud Composer 2, escribe el DAG con operadores de Airflow de Dataform y, a continuación, súbelo al bucket de tu entorno.
En el siguiente ejemplo de código se muestra un DAG de Airflow que crea un resultado de compilación de Dataform e inicia una invocación de flujo de trabajo de Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Haz los cambios siguientes:
- PROJECT_ID: el ID de tu proyecto de Dataform. Google Cloud
- REPOSITORY_ID: el nombre de tu repositorio de Dataform.
- REGION: la región en la que se encuentra el repositorio de Dataform.
- COMPILATION_RESULT: el nombre del resultado de la compilación que quieras usar en esta invocación del flujo de trabajo.
- GIT_COMMITISH: el commitish de Git en el repositorio de Git remoto de la versión del código que quieras usar (por ejemplo, una rama o un SHA de Git).
En el siguiente ejemplo de código se muestra un DAG de Airflow que hace lo siguiente:
- Crea un resultado de compilación de Dataform.
- Inicia una invocación asíncrona de un flujo de trabajo de Dataform.
- Consulta el estado de tu flujo de trabajo hasta que alcance el estado esperado
mediante
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Haz los cambios siguientes:
- PROJECT_ID: tu Google Cloud projectID de Dataform.
- REPOSITORY_ID: el nombre de tu repositorio de Dataform.
- REGION: la región en la que se encuentra el repositorio de Dataform.
- COMPILATION_RESULT: el nombre del resultado de la compilación que quieras usar en esta invocación del flujo de trabajo.
- GIT_COMMITISH: el commitish de Git en el repositorio de Git remoto de la versión del código que quieras usar (por ejemplo, una rama o un SHA de Git).
- COMPILATION_RESULT: el nombre del resultado de la compilación que quieras usar en esta invocación del flujo de trabajo.
Añadir parámetros de configuración de compilación
Puedes añadir parámetros de configuración de compilación adicionales al objeto create_compilation_result
DAG de Airflow. Para obtener más información sobre los parámetros disponibles, consulte la CodeCompilationConfig
referencia de la API Dataform.
Para añadir parámetros de configuración de compilación al objeto
create_compilation_result
DAG de Airflow, añade los parámetros que hayas seleccionado al campocode_compilation_config
con el siguiente formato:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Haz los cambios siguientes:
- PROJECT_ID: el ID de tu proyecto de Dataform. Google Cloud
- REPOSITORY_ID: el nombre de tu repositorio de Dataform.
- REGION: la región en la que se encuentra el repositorio de Dataform.
- GIT_COMMITISH: el commitish de Git en el repositorio de Git remoto de la versión del código que quieras usar (por ejemplo, una rama o un SHA de Git).
- PARAMETER: el parámetro
CodeCompilationConfig
seleccionado. Puedes añadir varios parámetros. - PARAMETER_VALUE: el valor del parámetro seleccionado.
En el siguiente ejemplo de código se muestra el parámetro defaultDatabase
añadido al objeto DAG de create_compilation_result
Airflow:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Añadir parámetros de configuración de invocación de flujo de trabajo
Puedes añadir parámetros de configuración de invocación de flujo de trabajo adicionales al objeto create_workflow_invocation
DAG de Airflow. Para obtener más información sobre los parámetros disponibles, consulte la InvocationConfig
referencia de la API Dataform.
Para añadir parámetros de configuración de invocación de flujo de trabajo al objeto DAG de Airflow
create_workflow_invocation
, añade los parámetros seleccionados al campoinvocation_config
con el siguiente formato:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Haz los cambios siguientes:
- PROJECT_ID: el ID de tu proyecto de Dataform. Google Cloud
- REPOSITORY_ID: el nombre de tu repositorio de Dataform.
- REGION: la región en la que se encuentra el repositorio de Dataform.
- PARAMETER: el parámetro
InvocationConfig
seleccionado. Puedes añadir varios parámetros. - PARAMETER_VALUE: el valor del parámetro seleccionado.
En el siguiente ejemplo de código se muestran los parámetros includedTags[]
y transitiveDependenciesIncluded
añadidos al objeto create_workflow_invocation
DAG de Airflow:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
Siguientes pasos
- Para saber cómo configurar las configuraciones de lanzamiento de compilación de Dataform, consulta Crear una configuración de lanzamiento.
- Para obtener más información sobre el ciclo de vida del código de Dataform, consulta el artículo Introducción al ciclo de vida del código en Dataform.
- Para obtener más información sobre la API Dataform, consulta API Dataform.
- Para obtener más información sobre los entornos de Cloud Composer, consulta la descripción general de Cloud Composer.
- Para obtener más información sobre los precios de Workflows, consulta la página Precios de Workflows.