Este documento mostra como fazer o seguinte no Dataform:
- Programar execuções com configurações de fluxo de trabalho.
- Programar execuções com o Workflows e o Cloud Scheduler.
- Programar execuções com o Cloud Composer.
Antes de começar
Para programar execuções com configurações de fluxo de trabalho ou programar execuções com fluxos de trabalho e o Cloud Scheduler, faça o seguinte:
No Google Cloud console, acesse a página Dataform.
Selecione ou crie um repositório.
Crie uma configuração de versão.
Para programar execuções com o Cloud Composer, faça o seguinte:
- Selecione ou crie um repositório do Dataform.
- Conceder acesso ao Dataform ao BigQuery.
- Selecione ou crie um espaço de trabalho do Dataform.
- Crie pelo menos uma tabela.
- Crie um ambiente do Cloud Composer 2.
Funções exigidas
Para receber as permissões necessárias para concluir as tarefas neste documento, peça ao administrador para conceder a você os seguintes papéis do IAM:
-
Administrador do Dataform (
roles/dataform.admin
) em repositórios -
Worker do Composer (
roles/composer.worker
) na conta de serviço do ambiente do Cloud Composer
Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.
Também é possível conseguir as permissões necessárias usando papéis personalizados ou outros papéis predefinidos.
Para usar uma conta de serviço diferente da conta de serviço padrão do Dataform, conceda acesso à conta de serviço personalizada.
Para ativar as execuções programadas de uma configuração de fluxo de trabalho quando o modo estrito de agir como está ativado, conceda a permissão iam.serviceAccounts.actAs
à conta de serviço do Dataform para a conta de serviço usada na configuração do fluxo de trabalho. Essa permissão está disponível no papel de Usuário da conta de serviço (roles/iam.serviceAccountUser
).
Para usar as credenciais de usuário da Conta do Google ao criar uma configuração de fluxo de trabalho (Prévia), conceda acesso à Conta do Google.
Programar execuções com configurações de fluxo de trabalho
Nesta seção, mostramos como criar uma configuração de fluxo de trabalho no Dataform para programar e configurar execuções de fluxo de trabalho. É possível usar configurações de fluxo de trabalho para executar fluxos de trabalho do Dataform de acordo com uma programação.
Sobre as configurações de fluxo de trabalho
Para programar execuções do Dataform de todas ou de algumas ações de fluxo de trabalho no BigQuery, crie configurações de fluxo de trabalho. Em uma configuração de fluxo de trabalho, você seleciona uma configuração de lançamento de compilação, escolhe ações de fluxo de trabalho para execução e define a programação de execução.
Em seguida, durante uma execução programada da configuração do fluxo de trabalho, o Dataform implanta sua seleção de ações do resultado da compilação mais recente na configuração de lançamento para o BigQuery. Também é possível acionar manualmente a execução de uma configuração de fluxo de trabalho com a API Dataform workflowConfigs.
Uma configuração de fluxo de trabalho do Dataform contém as seguintes configurações de execução:
- ID da configuração do fluxo de trabalho.
- Configuração de lançamento.
Conta de serviço.
Essa é a conta de serviço associada à configuração do fluxo de trabalho. Você pode selecionar a conta de serviço padrão do Dataform ou uma conta associada ao seu projeto do Google Cloud . Também é possível inserir manualmente uma conta de serviço diferente. Por padrão, as configurações de fluxo de trabalho usam as mesmas contas de serviço que os repositórios.
As credenciais da conta de serviço são o método de autorização padrão para criação e execução de configurações de fluxo de trabalho programado.
Credenciais de usuário da Conta do Google (prévia)
As credenciais de usuário da Conta do Google são o método de autorização padrão para criações e execuções de configuração de fluxo de trabalho manuais e não programadas. Para mais informações, consulte Autorizar sua Conta do Google.
Ações do fluxo de trabalho a serem executadas:
- Todas as ações.
- Seleção de ações.
- Seleção de tags.
Programação de execução e fuso horário.
Criar uma configuração de fluxo de trabalho
Para criar uma configuração de fluxo de trabalho do Dataform, siga estas etapas:
- No repositório, acesse Lançamentos e programação.
- Na seção Configurações do fluxo de trabalho, clique em Criar.
No painel Criar configuração do fluxo de trabalho, no campo ID da configuração, insira um ID exclusivo para a configuração do fluxo de trabalho.
Os IDs podem incluir apenas números, letras, hifens e sublinhados.
No menu Configuração da versão, selecione uma configuração de versão de compilação.
Na seção Autenticação, autorize a configuração do fluxo de trabalho com as credenciais de usuário da sua Conta do Google ou uma conta de serviço.
- Para usar as credenciais de usuário da sua Conta do Google (Prévia), selecione Executar com minhas credenciais de usuário.
- Para usar uma conta de serviço, selecione Executar com a conta de serviço selecionada e escolha a conta de serviço padrão do Dataform ou qualquer conta de serviço associada ao seu projetoGoogle Cloud a que você tem acesso. Se você não selecionar uma conta de serviço, a configuração do fluxo de trabalho usará a conta de serviço do repositório.
Opcional: no campo Frequência de programação, insira a frequência de execuções no formato unix-cron.
Para verificar se o Dataform executa o resultado da compilação mais recente na configuração de lançamento correspondente, mantenha um intervalo mínimo de uma hora entre o momento da criação do resultado da compilação e o momento da execução programada.
Opcional: no menu Fuso horário, selecione o fuso horário das execuções.
O fuso horário padrão é UTC.
Selecione as ações do fluxo de trabalho a serem executadas:
- Para executar todo o fluxo de trabalho, clique em Todas as ações.
- Para executar as ações selecionadas no fluxo de trabalho, clique em Seleção de ações e escolha as ações.
- Para executar ações com as tags selecionadas, clique em Seleção de tags e escolha as tags.
- Opcional: para executar as ações ou tags selecionadas e as dependências delas, marque a opção Incluir dependências.
- Opcional: para executar as ações ou tags selecionadas e os dependentes delas, selecione a opção Incluir dependentes.
- Opcional: para recriar todas as tabelas do zero, selecione a opção Executar com atualização completa.
Sem essa opção, o Dataform atualiza as tabelas incrementais sem recriá-las do zero.
Clique em Criar. Se você selecionou Executar com minhas credenciais de usuário como método de autenticação, autorize sua Conta do Google (Prévia).
Por exemplo, a configuração de fluxo de trabalho a seguir executa ações com a
tag hourly
a cada hora no fuso horário CEST:
- ID da configuração:
production-hourly
- Configuração da versão: -
- Frequência:
0 * * * *
- Fuso horário:
Central European Summer Time (CEST)
- Seleção de ações do fluxo de trabalho: seleção de tags, tag
hourly
Autorizar sua Conta do Google
Para autenticar o recurso com as credenciais de usuário da sua Conta do Google, conceda permissão manualmente para que os pipelines do BigQuery recebam o token de acesso da sua Conta do Google e acessem os dados de origem em seu nome. É possível conceder aprovação manual com a interface da caixa de diálogo do OAuth.
Você só precisa conceder permissão aos pipelines do BigQuery uma vez.
Para revogar a permissão concedida, siga estas etapas:
- Acesse a página da sua Conta do Google.
- Clique em Pipelines do BigQuery.
- Clique em Remover acesso.
Alterar o proprietário da configuração do fluxo de trabalho atualizando as credenciais também requer aprovação manual se o novo proprietário da Conta do Google nunca tiver criado uma configuração de fluxo de trabalho antes.
Editar uma configuração de fluxo de trabalho
Para editar uma configuração de fluxo de trabalho, siga estas etapas:
- No repositório, acesse Lançamentos e programação.
- Na configuração do fluxo de trabalho que você quer editar, clique no menu Mais e em Editar.
- No painel Editar configuração do fluxo de trabalho, edite as configurações da configuração de versão e clique em Salvar.
Excluir uma configuração de fluxo de trabalho
Para excluir uma configuração de fluxo de trabalho, siga estas etapas:
- No repositório, acesse Lançamentos e programação.
- Na configuração do fluxo de trabalho que você quer excluir, clique no menu Mais e em Excluir.
- Na caixa de diálogo Excluir configuração da versão, clique em Excluir.
Programar execuções com o Workflows e o Cloud Scheduler
Nesta seção, mostramos como programar execuções de fluxos de trabalho do Dataform usando o Workflows e o Cloud Scheduler.
Sobre execuções de fluxo de trabalho programadas
É possível definir a frequência das execuções do fluxo de trabalho do Dataform criando um job do Cloud Scheduler que aciona um fluxo de trabalho do Workflows. O Workflows executa serviços em um fluxo de trabalho de orquestração definido por você.
O Workflows executa seu fluxo de trabalho do Dataform em um processo de duas etapas. Primeiro, ele extrai o código do repositório do Dataform do seu provedor Git e o compila em um resultado de compilação. Em seguida, ele usa o resultado da compilação para criar um fluxo de trabalho do Dataform e o executa na frequência definida.
Criar um fluxo de trabalho de orquestração programado
Para programar execuções do fluxo de trabalho do Dataform, use o Workflows para criar um fluxo de trabalho de orquestração e adicione um job do Cloud Scheduler como um gatilho.
O Workflows usa contas de serviço para dar acesso aos fluxos de trabalho aos recursos do Google Cloud . Crie uma conta de serviço e conceda a ela o papel de Editor do Dataform (
roles/dataform.editor
) do Identity and Access Management, além das permissões mínimas necessárias para gerenciar seu fluxo de trabalho de orquestração. Para mais informações, consulte Conceder permissão a um fluxo de trabalho para acessar recursos do Google Cloud .Crie um fluxo de trabalho de orquestração e use o seguinte código-fonte YAML como definição do fluxo de trabalho:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Substitua:
- PROJECT_ID: o ID do seu projeto do Google Cloud .
- REPOSITORY_LOCATION: o local do repositório do Dataform.
- REPOSITORY_ID: o nome do seu repositório do Dataform.
- GIT_COMMITISH: a ramificação do Git em que você quer executar o código do Dataform. Para um repositório recém-criado, substitua por
main
.
Programe o fluxo de trabalho de orquestração usando o Cloud Scheduler.
Personalizar a solicitação de resultado da compilação de criação do fluxo de trabalho do Dataform
É possível atualizar o fluxo de trabalho de orquestração atual e definir as configurações de solicitação de resultado da compilação de criação do fluxo de trabalho do Dataform no formato YAML. Para mais informações sobre as configurações, consulte a
referência do recurso REST projects.locations.repositories.compilationResults
.
Por exemplo, para adicionar uma configuração _dev
schemaSuffix
a todas as ações durante a compilação,
substitua o corpo da etapa createCompilationResult
pelo seguinte snippet de código:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
Também é possível transmitir outras configurações como argumentos de ambiente de execução em uma solicitação de execução do Workflows e acessar esses argumentos usando variáveis. Para mais informações, consulte Transmitir argumentos de ambiente de execução em uma solicitação de execução.
Personalizar a solicitação de invocação do fluxo de trabalho do Dataform
É possível atualizar o fluxo de trabalho de orquestração atual e definir as configurações de solicitação de invocação do fluxo de trabalho do Dataform no formato YAML. Para mais informações sobre as configurações de solicitação de invocação, consulte a
referência do recurso REST projects.locations.repositories.workflowInvocations
.
Por exemplo, para executar apenas ações com a tag hourly
com todas as dependências transitivas incluídas, substitua o corpo createWorkflowInvocation
pelo seguinte snippet de código:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
Também é possível transmitir outras configurações como argumentos de ambiente de execução em uma solicitação de execução do Workflows e acessar esses argumentos usando variáveis. Para mais informações, consulte Transmitir argumentos de ambiente de execução em uma solicitação de execução.
Programar execuções com o Cloud Composer
Use o Cloud Composer 2 para programar execuções do Dataform. O Dataform não é compatível com o Cloud Composer 1.
Para gerenciar programações de execuções do Dataform com o Cloud Composer 2, use os operadores do Dataform em gráficos acíclicos dirigidos (DAGs) do Airflow. É possível criar um DAG do Airflow que programa invocações de fluxo de trabalho do Dataform.
O Dataform oferece vários operadores do Airflow. Isso inclui operadores para receber um resultado de compilação, uma invocação de fluxo de trabalho e cancelar uma invocação de fluxo de trabalho. Para conferir a lista completa de operadores do Dataform Airflow disponíveis, consulte Operadores do Google Dataform.
Instale o pacote google-cloud-dataform
do PyPi
Se você usa as versões 2.0.25
e mais recentes do Cloud Composer 2, esse pacote
é pré-instalado no seu ambiente. Não é necessário instalar nada.
Se você usa versões anteriores do Cloud Composer 2, instale o pacote google-cloud-dataform
do PyPi.
Na seção "Pacotes PyPI", especifique a versão ==0.2.0
.
Criar um DAG do Airflow que programa invocações de fluxo de trabalho do Dataform
Para gerenciar execuções programadas de fluxos de trabalho do Dataform com o Cloud Composer 2, escreva o DAG usando os operadores do Dataform Airflow e faça upload dele para o bucket do seu ambiente.
O exemplo de código a seguir mostra um DAG do Airflow que cria um resultado de compilação do Dataform e inicia uma invocação de fluxo de trabalho do Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Substitua:
- PROJECT_ID: o ID do projeto do Dataform Google Cloud .
- REPOSITORY_ID: o nome do seu repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.
- GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.
O exemplo de código a seguir mostra um DAG do Airflow que realiza as seguintes ações:
- Cria um resultado de compilação do Dataform.
- Inicia uma invocação assíncrona do fluxo de trabalho do Dataform.
- Pesquisa o status do fluxo de trabalho até que ele entre no estado esperado
usando
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Substitua:
- PROJECT_ID: seu Dataform Google Cloud projectID.
- REPOSITORY_ID: o nome do seu repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.
- GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.
- COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.
Adicionar parâmetros de configuração de compilação
É possível adicionar outros parâmetros de configuração de compilação ao objeto DAG do Airflow create_compilation_result
. Para mais informações sobre os parâmetros disponíveis, consulte a
referência da API Dataform CodeCompilationConfig
.
Para adicionar parâmetros de configuração de compilação ao objeto DAG do Airflow
create_compilation_result
, adicione os parâmetros selecionados ao campocode_compilation_config
no seguinte formato:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Substitua:
- PROJECT_ID: o ID do projeto do Dataform Google Cloud .
- REPOSITORY_ID: o nome do seu repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.
- PARAMETER: o parâmetro
CodeCompilationConfig
selecionado. É possível adicionar vários parâmetros. - PARAMETER_VALUE: o valor do parâmetro selecionado.
O exemplo de código a seguir mostra o parâmetro defaultDatabase
adicionado ao
objeto DAG do Airflow create_compilation_result
:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Adicionar parâmetros de configuração de invocação do fluxo de trabalho
É possível adicionar outros parâmetros de configuração de invocação de fluxo de trabalho ao objeto DAG do Airflow create_workflow_invocation
. Para mais informações sobre os parâmetros disponíveis, consulte a
referência da API Dataform InvocationConfig
.
Para adicionar parâmetros de configuração de invocação de fluxo de trabalho ao objeto DAG do Airflow
create_workflow_invocation
, adicione os parâmetros selecionados ao campoinvocation_config
no seguinte formato:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Substitua:
- PROJECT_ID: o ID do projeto do Dataform Google Cloud .
- REPOSITORY_ID: o nome do seu repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- PARAMETER: o parâmetro
InvocationConfig
selecionado. É possível adicionar vários parâmetros. - PARAMETER_VALUE: o valor do parâmetro selecionado.
O exemplo de código a seguir mostra os parâmetros includedTags[]
e transitiveDependenciesIncluded
adicionados ao objeto DAG do Airflow create_workflow_invocation
:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
A seguir
- Para saber como configurar as configurações de versão de compilação do Dataform, consulte Criar uma configuração de versão.
- Para saber mais sobre o ciclo de vida do código no Dataform, consulte Introdução ao ciclo de vida do código no Dataform.
- Para saber mais sobre a API Dataform, consulte API Dataform.
- Para saber mais sobre os ambientes do Cloud Composer, consulte Visão geral do Cloud Composer.
- Para saber mais sobre os preços do Workflows, consulte Preços do Workflows.