En este documento, se muestra cómo hacer lo siguiente en Dataform:
- Programa ejecuciones con configuraciones de flujo de trabajo.
- Programa ejecuciones con Workflows y Cloud Scheduler.
- Programa ejecuciones con Cloud Composer.
Antes de comenzar
Para programar ejecuciones con configuraciones de flujos de trabajo o programar ejecuciones con flujos de trabajo y Cloud Scheduler, haz lo siguiente:
En la consola de Google Cloud , ve a la página Dataform.
Selecciona o crea un repositorio.
Crea una configuración de lanzamiento.
Para programar ejecuciones con Cloud Composer, haz lo siguiente:
- Selecciona o crea un repositorio de Dataform.
- Otorgar acceso de Dataform a BigQuery.
- Selecciona o crea un espacio de trabajo de Dataform.
- Crea al menos una tabla.
- Crea un entorno de Cloud Composer 2.
Roles obligatorios
Para obtener los permisos que necesitas para completar las tareas de este documento, pídele a tu administrador que te otorgue los siguientes roles de IAM:
-
Administrador de Dataform (
roles/dataform.admin
) en repositorios -
Trabajador de Composer (
roles/composer.worker
) en la cuenta de servicio del entorno de Cloud Composer
Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.
También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.
Para usar una cuenta de servicio distinta de la cuenta de servicio predeterminada de Dataform, otorga acceso a la cuenta de servicio personalizada.
Para habilitar las ejecuciones programadas de una configuración de flujo de trabajo cuando está habilitado el modo de actuar como estricto, debes otorgar el permiso iam.serviceAccounts.actAs
a la cuenta de servicio de Dataform para la cuenta de servicio que se usa en la configuración del flujo de trabajo. Este permiso está disponible en el rol de usuario de cuenta de servicio (roles/iam.serviceAccountUser
).
Para usar las credenciales de usuario de la Cuenta de Google cuando crees una configuración de flujo de trabajo (Versión preliminar), otorga acceso a la Cuenta de Google.
Programa ejecuciones con configuraciones de flujo de trabajo
En esta sección, se muestra cómo crear una configuración de flujo de trabajo en Dataform para programar y configurar ejecuciones de flujos de trabajo. Puedes usar configuraciones de flujo de trabajo para ejecutar flujos de trabajo de Dataform de manera programada.
Información acerca de las configuraciones de flujo de trabajo
Para programar ejecuciones de Dataform de todas las acciones de flujo de trabajo o de las seleccionadas en BigQuery, puedes crear parámetros de configuración de flujo de trabajo. En una configuración de flujo de trabajo, seleccionas una configuración de lanzamiento de compilación, seleccionas acciones de flujo de trabajo para la ejecución y estableces el programa de ejecución.
Luego, durante una ejecución programada de la configuración de tu flujo de trabajo, Dataform implementa en BigQuery la selección de acciones del resultado de compilación más reciente de tu configuración de lanzamiento. También puedes activar manualmente la ejecución de una configuración de flujo de trabajo con workflowConfigs de la API de Dataform.
Una configuración de flujo de trabajo de Dataform contiene la siguiente configuración de ejecución:
- Es el ID de la configuración del flujo de trabajo.
- Configuración de lanzamiento.
Cuenta de servicio.
Esta es la cuenta de servicio asociada con la configuración del flujo de trabajo. Puedes seleccionar la cuenta de servicio de Dataform predeterminada o una cuenta de servicio asociada con tu Google Cloud proyecto, o puedes ingresar manualmente una cuenta de servicio diferente. De forma predeterminada, las configuraciones de flujo de trabajo usan las mismas cuentas de servicio que sus repositories.
Las credenciales de la cuenta de servicio son el método de autorización predeterminado para la creación y ejecución de la configuración de flujos de trabajo programados.
Credenciales de usuario de la Cuenta de Google (Versión preliminar)
Las credenciales de usuario de la Cuenta de Google son el método de autorización predeterminado para la creación y ejecución de configuraciones de flujos de trabajo manuales y no programados. Para obtener más información, consulta Cómo autorizar tu Cuenta de Google.
Acciones del flujo de trabajo que se ejecutarán:
- Todas las acciones.
- Selección de acciones.
- Selección de etiquetas.
Programa de ejecución y zona horaria
Crea una configuración de flujo de trabajo
Para crear una configuración de flujo de trabajo de Dataform, sigue estos pasos:
- En el repositorio, ve a Lanzamientos y programación.
- En la sección Configuraciones de flujo de trabajo, haz clic en Crear.
En el panel Crear configuración de flujo de trabajo, en el campo ID de configuración, ingresa un ID único para la configuración del flujo de trabajo.
Los IDs solo pueden incluir números, letras, guiones y guiones bajos.
En el menú Configuración de lanzamiento, selecciona una configuración de lanzamiento de compilación.
En la sección Authentication, autoriza la configuración del flujo de trabajo con las credenciales de usuario de tu Cuenta de Google o una cuenta de servicio.
- Para usar las credenciales de usuario de tu Cuenta de Google (Versión preliminar), selecciona Ejecutar con mis credenciales de usuario.
- Para usar una cuenta de servicio, selecciona Ejecutar con la cuenta de servicio seleccionada y, luego, selecciona la cuenta de servicio predeterminada de Dataform o cualquier cuenta de servicio asociada con tu proyectoGoogle Cloud al que tengas acceso. Si no seleccionas una cuenta de servicio, la configuración del flujo de trabajo usa la cuenta de servicio del repositorio.
Opcional: En el campo Programar frecuencia, ingresa la frecuencia de las ejecuciones en el formato unix-cron.
Para verificar que Dataform ejecute el resultado de compilación más reciente en la configuración de lanzamiento correspondiente, mantén un descanso mínimo de una hora entre el momento de la creación del resultado de compilación y el momento de la ejecución programada.
Opcional: En el menú Zona horaria, selecciona la zona horaria para las ejecuciones.
La zona horaria predeterminada es UTC.
Selecciona las acciones del flujo de trabajo que se ejecutarán:
- Para ejecutar todo el flujo de trabajo, haz clic en Todas las acciones.
- Para ejecutar las acciones seleccionadas en el flujo de trabajo, haz clic en Selección de acciones y, luego, selecciona las acciones.
- Para ejecutar acciones con etiquetas seleccionadas, haz clic en Selección de etiquetas y, luego, selecciona las etiquetas.
- Opcional: Para ejecutar las acciones o etiquetas seleccionadas y sus dependencias, selecciona la opción Incluir dependencias.
- Opcional: Para ejecutar las acciones o etiquetas seleccionadas y sus dependencias, selecciona la opción Incluir dependencias.
- Opcional: Para volver a compilar todas las tablas desde cero, selecciona la opción Run with full refresh.
Sin esta opción, Dataform actualiza las tablas incrementales sin volver a compilarlas desde cero.
Haz clic en Crear. Si seleccionaste Ejecutar con mis credenciales de usuario para tu método de autenticación, debes autorizar tu Cuenta de Google (Versión preliminar).
Por ejemplo, la siguiente configuración de flujo de trabajo ejecuta acciones con la etiqueta hourly
cada hora en la zona horaria CEST:
- ID de configuración:
production-hourly
- Configuración de lanzamiento: -
- Frecuencia:
0 * * * *
- Zona horaria:
Central European Summer Time (CEST)
- Selección de acciones de flujo de trabajo: selección de etiquetas, etiqueta
hourly
Autoriza tu Cuenta de Google
Para autenticar el recurso con las credenciales de usuario de tu Cuenta de Google, debes otorgar permiso de forma manual a las canalización de BigQuery para que obtengan el token de acceso de tu Cuenta de Google y accedan a los datos de origen en tu nombre. Puedes otorgar la aprobación manual con la interfaz de diálogo de OAuth.
Solo debes otorgar permiso a las canalizaciones de BigQuery una vez.
Para revocar el permiso que otorgaste, sigue estos pasos:
- Ve a la página de tu Cuenta de Google.
- Haz clic en Canales de BigQuery.
- Haga clic en Quitar acceso.
Cambiar el propietario de la configuración del flujo de trabajo mediante la actualización de las credenciales también requiere aprobación manual si el nuevo propietario de la Cuenta de Google nunca antes creó una configuración de flujo de trabajo.
Edita la configuración de un flujo de trabajo
Para editar la configuración de un flujo de trabajo, sigue estos pasos:
- En el repositorio, ve a Lanzamientos y programación.
- En la configuración del flujo de trabajo que deseas editar, haz clic en el menú Más y, luego, en Editar.
- En el panel Editar configuración del flujo de trabajo, edita la configuración de la configuración de lanzamiento y, luego, haz clic en Guardar.
Borra la configuración de un flujo de trabajo
Para borrar la configuración de un flujo de trabajo, sigue estos pasos:
- En el repositorio, ve a Lanzamientos y programación.
- En la configuración del flujo de trabajo que deseas borrar, haz clic en el menú Más y, luego, en Borrar.
- En el diálogo Borrar configuración de lanzamiento, haz clic en Borrar.
Programa ejecuciones con Workflows y Cloud Scheduler
En esta sección, se muestra cómo programar ejecuciones de flujos de trabajo de Dataform con Workflows y Cloud Scheduler.
Acerca de las ejecuciones de flujos de trabajo programados
Puedes configurar la frecuencia de las ejecuciones de flujo de trabajo de Dataform si creas un trabajo de Cloud Scheduler que active un flujo de trabajo de Workflows. Workflows ejecuta servicios en un flujo de trabajo de organización que tú defines.
Workflows ejecuta tu flujo de trabajo de Dataform en un proceso de dos pasos. Primero, extrae el código del repositorio de Dataform de tu proveedor de Git y lo compila en un resultado de compilación. Luego, usa el resultado de la compilación para crear un flujo de trabajo de Dataform y lo ejecuta con la frecuencia que establezcas.
Crea un flujo de trabajo de orquestación programado
Para programar ejecuciones de tu flujo de trabajo de Dataform, usa Workflows para crear un flujo de trabajo de orquestación y agregar un trabajo de Cloud Scheduler como activador.
Workflows usa cuentas de servicio para otorgarles acceso a los recursos deGoogle Cloud . Crea una cuenta de servicio y bríndale el rol de Identity and Access Management de Editor de Dataform (
roles/dataform.editor
), así como los permisos mínimos necesarios para administrar tu flujo de trabajo de orquestación. Para obtener más información, consulta Otorga permiso a un flujo de trabajo para acceder a los recursos de Google Cloud .Crea un flujo de trabajo de orquestación y usa el siguiente código fuente de YAML como definición de flujo de trabajo:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Reemplaza lo siguiente:
- PROJECT_ID: Es el ID de tu Google Cloud proyecto.
- REPOSITORY_LOCATION: Es la ubicación de tu repositorio de Dataform.
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- GIT_COMMITISH: Es la rama de Git desde la que deseas ejecutar el código de Dataform. Para un repositorio recién creado, reemplázalo por
main
.
Programa el flujo de trabajo de orquestación con Cloud Scheduler.
Personaliza la solicitud de resultado de compilación de creación del flujo de trabajo de Dataform
Puedes
actualizar el flujo de trabajo de orquestación existente
y definir la configuración de la solicitud de resultado de compilación de creación del flujo de trabajo de Dataform
en el formato YAML. Para obtener más información sobre la configuración, consulta la referencia de recursos REST de projects.locations.repositories.compilationResults
.
Por ejemplo, para agregar una configuración schemaSuffix
_dev
a todas las acciones durante la compilación, reemplaza el cuerpo del paso createCompilationResult
por el siguiente fragmento de código:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
También puedes pasar parámetros de configuración adicionales como argumentos de tiempo de ejecución en una solicitud de ejecución de Workflows y acceder a esos argumentos con variables. Para obtener más información, consulta Cómo pasar argumentos del entorno de ejecución en una solicitud de ejecución.
Personaliza la solicitud de invocación del flujo de trabajo de Dataform
Puedes
actualizar el flujo de trabajo de orquestación existente
y definir la configuración de la solicitud de invocación del flujo de trabajo de Dataform en
el formato YAML. Para obtener más información sobre la configuración de la solicitud de invocación, consulta la referencia de recursos REST de projects.locations.repositories.workflowInvocations
.
Por ejemplo, para ejecutar solo acciones con la etiqueta hourly
con todas las dependencias transitivas incluidas, reemplaza el cuerpo createWorkflowInvocation
por el siguiente fragmento de código:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
También puedes pasar parámetros de configuración adicionales como argumentos de tiempo de ejecución en una solicitud de ejecución de Workflows y acceder a esos argumentos con variables. Para obtener más información, consulta Cómo pasar argumentos del entorno de ejecución en una solicitud de ejecución.
Programa ejecuciones con Cloud Composer
Puedes usar Cloud Composer 2 para programar ejecuciones de Dataform. Dataform no es compatible con Cloud Composer 1.
Para administrar las programaciones de las ejecuciones de Dataform con Cloud Composer 2, puedes usar operadores de Dataform en los grafos acíclicos dirigidos (DAG) de Airflow. Puedes crear un DAG de Airflow que programe invocaciones de flujos de trabajo de Dataform.
Dataform proporciona varios operadores de Airflow. Estos incluyen operadores para obtener un resultado de compilación, obtener una invocación de flujo de trabajo y cancelar una invocación de flujo de trabajo. Para ver la lista completa de operadores de Dataform Airflow disponibles, consulta Operadores de Google Dataform.
Instala el paquete google-cloud-dataform
de PyPI
Si usas las versiones 2.0.25
y posteriores de Cloud Composer 2, este paquete está preinstalado en tu entorno. No es necesario que la instales.
Si usas versiones anteriores de Cloud Composer 2, instala el paquete google-cloud-dataform
de PyPI.
En la sección de paquetes de PyPI, especifica la versión ==0.2.0
.
Crea un DAG de Airflow que programe invocaciones de flujos de trabajo de Dataform
Para administrar ejecuciones programadas de flujos de trabajo de Dataform con Cloud Composer 2, escribe el DAG con operadores de Airflow de Dataform y, luego, súbelo al bucket de tu entorno.
En la siguiente muestra de código, se muestra un DAG de Airflow que crea un resultado de compilación de Dataform y comienza una invocación de flujo de trabajo de Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Reemplaza lo siguiente:
- PROJECT_ID: El ID de tu proyecto de Google Cloud Dataform.
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: La región en la que se encuentra el repositorio de Dataform.
- COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación de flujo de trabajo.
- GIT_COMMITISH: El commit de Git en el repositorio remoto de Git de la versión del código que deseas usar, por ejemplo, una rama o un SHA de Git.
En la siguiente muestra de código, se muestra un DAG de Airflow que realiza las siguientes acciones:
- Crea un resultado de compilación de Dataform.
- Inicia una invocación asíncrona del flujo de trabajo de Dataform.
- Consulta el estado de tu flujo de trabajo hasta que ingrese al estado esperado con
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Reemplaza lo siguiente:
- PROJECT_ID: El Google Cloud ID del proyecto de Dataform.
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: La región en la que se encuentra el repositorio de Dataform.
- COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación de flujo de trabajo.
- GIT_COMMITISH: El commit de Git en el repositorio remoto de Git de la versión del código que deseas usar, por ejemplo, una rama o un SHA de Git.
- COMPILATION_RESULT: Es el nombre del resultado de la compilación que deseas usar para esta invocación de flujo de trabajo.
Agrega parámetros de configuración de compilación
Puedes agregar parámetros de configuración de compilación adicionales al objeto DAG de create_compilation_result
de Airflow. Para obtener más información sobre los parámetros disponibles, consulta la referencia de la API de Dataform de CodeCompilationConfig
.
Para agregar parámetros de configuración de compilación al objeto DAG de Airflow
create_compilation_result
, agrega los parámetros seleccionados al campocode_compilation_config
en el siguiente formato:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Reemplaza lo siguiente:
- PROJECT_ID: El ID de tu proyecto de Google Cloud Dataform.
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: La región en la que se encuentra el repositorio de Dataform.
- GIT_COMMITISH: El commit de Git en el repositorio remoto de Git de la versión del código que deseas usar, por ejemplo, una rama o un SHA de Git.
- PARAMETER: Es el parámetro
CodeCompilationConfig
seleccionado. Puedes agregar varios parámetros. - PARAMETER_VALUE: Es el valor del parámetro seleccionado.
En la siguiente muestra de código, se muestra el parámetro defaultDatabase
agregado al objeto DAG create_compilation_result
de Airflow:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Agrega parámetros de configuración de invocación de flujo de trabajo
Puedes agregar parámetros de configuración de invocación de flujo de trabajo adicionales al objeto DAG de create_workflow_invocation
de Airflow. Para obtener más información sobre los parámetros disponibles, consulta la referencia de la API de Dataform de InvocationConfig
.
Para agregar parámetros de configuración de invocación de flujo de trabajo al objeto DAG de
create_workflow_invocation
Airflow, agrega los parámetros seleccionados al campoinvocation_config
en el siguiente formato:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Reemplaza lo siguiente:
- PROJECT_ID: El ID de tu proyecto de Google Cloud Dataform.
- REPOSITORY_ID: Es el nombre de tu repositorio de Dataform.
- REGION: La región en la que se encuentra el repositorio de Dataform.
- PARAMETER: Es el parámetro
InvocationConfig
seleccionado. Puedes agregar varios parámetros. - PARAMETER_VALUE: Es el valor del parámetro seleccionado.
En la siguiente muestra de código, se muestran los parámetros includedTags[]
y transitiveDependenciesIncluded
agregados al objeto DAG de Airflow create_workflow_invocation
:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
¿Qué sigue?
- Para aprender a configurar las configuraciones de lanzamiento de compilación de Dataform, consulta Cómo crear una configuración de lanzamiento.
- Para obtener más información sobre el ciclo de vida del código de Dataform, consulta Introducción al ciclo de vida del código en Dataform.
- Para obtener más información sobre la API de Dataform, consulta la API de Dataform.
- Para obtener más información sobre los entornos de Cloud Composer, consulta la Descripción general de Cloud Composer.
- Para obtener más información sobre los precios de Workflows, consulta Precios de Workflows.