Este documento mostra como fazer o seguinte no Dataform:
- Agende execuções com configurações de fluxo de trabalho.
- Agende execuções com os fluxos de trabalho e o Cloud Scheduler.
- Agende execuções com o Cloud Composer.
Antes de começar
Para programar execuções com configurações de fluxo de trabalho ou programar execuções com fluxos de trabalho e o Cloud Scheduler, faça o seguinte:
Na Google Cloud consola, aceda à página Dataform.
Selecione ou crie um repositório.
Crie uma configuração de lançamento.
Para programar execuções com o Cloud Composer, faça o seguinte:
- Selecione ou crie um repositório do Dataform.
- Conceda acesso do Dataform ao BigQuery.
- Selecione ou crie um espaço de trabalho do Dataform.
- Crie, pelo menos, uma tabela.
- Crie um ambiente do Cloud Composer 2.
Funções necessárias
Para receber as autorizações de que precisa para concluir as tarefas neste documento, peça ao seu administrador que lhe conceda as seguintes funções da IAM:
-
Administrador do Dataform (
roles/dataform.admin
) em repositórios -
Composer Worker (
roles/composer.worker
) na conta de serviço do ambiente do Cloud Composer
Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.
Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.
Para usar uma conta de serviço diferente da conta de serviço do Dataform predefinida, conceda acesso à conta de serviço personalizada.
Para ativar as execuções agendadas de uma configuração de fluxo de trabalho quando o modo de atuação rigoroso está ativado, tem de conceder a autorização iam.serviceAccounts.actAs
à conta de serviço do Dataform para a conta de serviço usada na configuração do fluxo de trabalho. Esta autorização está disponível na função de utilizador da conta de serviço (roles/iam.serviceAccountUser
).
Para usar as credenciais de utilizador da Conta Google ao criar uma configuração do fluxo de trabalho (pré-visualização), conceda acesso à Conta Google.
Agende execuções com configurações de fluxo de trabalho
Esta secção mostra como criar uma configuração de fluxo de trabalho no Dataform para agendar e configurar execuções de fluxos de trabalho. Pode usar configurações de fluxo de trabalho para executar fluxos de trabalho do Dataform de acordo com um horário.
Acerca das configurações de fluxos de trabalho
Para agendar execuções do Dataform de todas ou de ações selecionadas do fluxo de trabalho no BigQuery, pode criar configurações de fluxo de trabalho. Numa configuração do fluxo de trabalho, seleciona uma configuração de lançamento de compilação, seleciona ações do fluxo de trabalho para execução e define a programação de execução.
Em seguida, durante uma execução agendada da configuração do fluxo de trabalho, o Dataform implementa a sua seleção de ações a partir do resultado da compilação mais recente na configuração de lançamento para o BigQuery. Também pode acionar manualmente a execução de uma configuração de fluxo de trabalho com a API Dataform workflowConfigs.
Uma configuração do fluxo de trabalho do Dataform contém as seguintes definições de execução:
- ID da configuração do fluxo de trabalho.
- Configuração de lançamento.
Conta de serviço.
Esta é a conta de serviço associada à configuração do fluxo de trabalho. Pode selecionar a conta de serviço do Dataform predefinida ou uma conta de serviço associada ao seu Google Cloud projeto, ou pode introduzir manualmente uma conta de serviço diferente. Por predefinição, as configurações do fluxo de trabalho usam as mesmas contas de serviço que os respetivos repositórios.
As credenciais da conta de serviço são o método de autorização predefinido para a criação e as execuções de configurações de fluxo de trabalho agendadas.
Credenciais de utilizador da Conta Google (pré-visualização)
As credenciais de utilizador da Conta Google são o método de autorização predefinido para a criação e as execuções de configuração de fluxo de trabalho manuais e não agendadas. Para mais informações, consulte o artigo Autorize a sua Conta Google.
Ações do fluxo de trabalho a executar:
- Todas as ações.
- Seleção de ações.
- Seleção de etiquetas.
Programação da execução e fuso horário.
Crie uma configuração do fluxo de trabalho
Para criar uma configuração de fluxo de trabalho do Dataform, siga estes passos:
- No repositório, aceda a Lançamentos e agendamento.
- Na secção Configurações do fluxo de trabalho, clique em Criar.
No painel Criar configuração do fluxo de trabalho, no campo ID da configuração, introduza um ID exclusivo para a configuração do fluxo de trabalho.
Os IDs só podem incluir números, letras, hífenes e sublinhados.
No menu Configuração de lançamento, selecione uma configuração de lançamento de compilação.
Na secção Autenticação, autorize a configuração do fluxo de trabalho com as credenciais de utilizador da sua Conta Google ou uma conta de serviço.
- Para usar as credenciais de utilizador da sua Conta Google (Pré-visualização), selecione Executar com as minhas credenciais de utilizador.
- Para usar uma conta de serviço, selecione Executar com a conta de serviço selecionada e, de seguida, selecione a conta de serviço do Dataform predefinida ou qualquer conta de serviço associada ao seu Google Cloud projeto ao qual tem acesso. Se não selecionar uma conta de serviço, a configuração do fluxo de trabalho usa a conta de serviço do repositório.
Opcional: no campo Frequência da programação, introduza a frequência de execuções no formato unix-cron.
Para verificar se o Dataform executa o resultado da compilação mais recente na configuração de lançamento correspondente, mantenha um intervalo mínimo de uma hora entre a hora de criação do resultado da compilação e a hora de execução agendada.
Opcional: no menu Fuso horário, selecione o fuso horário para as execuções.
O fuso horário predefinido é UTC.
Selecione as ações do fluxo de trabalho a executar:
- Para executar todo o fluxo de trabalho, clique em Todas as ações.
- Para executar ações selecionadas no fluxo de trabalho, clique em Seleção de ações e, de seguida, selecione ações.
- Para executar ações com etiquetas selecionadas, clique em Seleção de etiquetas e, em seguida, selecione etiquetas.
- Opcional: para executar ações ou etiquetas selecionadas e as respetivas dependências, selecione a opção Incluir dependências.
- Opcional: para executar ações ou etiquetas selecionadas e os respetivos dependentes, selecione a opção Incluir dependentes.
- Opcional: para reconstruir todas as tabelas de raiz, selecione a opção Executar com atualização completa.
Sem esta opção, o Dataform atualiza as tabelas incrementais sem as reconstruir de raiz.
Clique em Criar. Se selecionou Executar com as minhas credenciais de utilizador para o seu método de autenticação, tem de autorizar a sua Conta Google (Pré-visualização).
Por exemplo, a seguinte configuração do fluxo de trabalho executa ações com a etiqueta hourly
a cada hora no fuso horário CEST:
- ID da configuração:
production-hourly
- Configuração do lançamento: -
- Frequência:
0 * * * *
- Fuso horário:
Central European Summer Time (CEST)
- Seleção de ações do fluxo de trabalho: seleção de etiquetas, etiqueta
hourly
Autorize a sua Conta Google
Para autenticar o recurso com as credenciais de utilizador da sua Conta Google, tem de conceder manualmente autorização aos pipelines do BigQuery para obterem a chave de acesso da sua Conta Google e acederem aos dados de origem em seu nome. Pode conceder aprovação manual com a interface da caixa de diálogo do OAuth.
Só tem de conceder autorização aos pipelines do BigQuery uma vez.
Para revogar a autorização que concedeu, siga estes passos:
- Aceda à página da Conta Google.
- Clique em Pipelines do BigQuery.
- Clique em Remover acesso.
A alteração do proprietário da configuração do fluxo de trabalho através da atualização das credenciais também requer aprovação manual se o novo proprietário da Conta Google nunca tiver criado uma configuração do fluxo de trabalho antes.
Edite uma configuração de fluxo de trabalho
Para editar uma configuração de fluxo de trabalho, siga estes passos:
- No repositório, aceda a Lançamentos e agendamento.
- Na configuração do fluxo de trabalho que quer editar, clique no menu Mais e, de seguida, em Editar.
- No painel Editar configuração do fluxo de trabalho, edite as definições de configuração de lançamento e, de seguida, clique em Guardar.
Elimine uma configuração de fluxo de trabalho
Para eliminar uma configuração de fluxo de trabalho, siga estes passos:
- No repositório, aceda a Lançamentos e agendamento.
- Junto à configuração do fluxo de trabalho que quer eliminar, clique no menu Mais e, de seguida, clique em Eliminar.
- Na caixa de diálogo Eliminar configuração de lançamento, clique em Eliminar.
Agende execuções com fluxos de trabalho e o Cloud Scheduler
Esta secção mostra como agendar execuções de fluxos de trabalho do Dataform usando fluxos de trabalho e o Cloud Scheduler.
Acerca das execuções de fluxos de trabalho agendadas
Pode definir a frequência das execuções do fluxo de trabalho do Dataform através da criação de uma tarefa do Cloud Scheduler que aciona um fluxo de trabalho do Workflows. Os fluxos de trabalho executam serviços num fluxo de trabalho de orquestração que define.
Os fluxos de trabalho executam o seu fluxo de trabalho do Dataform num processo de dois passos. Primeiro, extrai o código do repositório do Dataform do seu fornecedor de Git e compila-o num resultado de compilação. Em seguida, usa o resultado da compilação para criar um fluxo de trabalho do Dataform e executá-lo na frequência que definir.
Crie um fluxo de trabalho de orquestração agendado
Para agendar execuções do fluxo de trabalho do Dataform, use os fluxos de trabalho para criar um fluxo de trabalho de orquestração e adicione uma tarefa do Cloud Scheduler como acionador.
Os fluxos de trabalho usam contas de serviço para dar aos fluxos de trabalho acesso aGoogle Cloud recursos. Crie uma conta de serviço e conceda-lhe a função de gestão de identidade e de acesso (IAM) de editor do Dataform (
roles/dataform.editor
), bem como as autorizações mínimas necessárias para gerir o seu fluxo de trabalho de orquestração. Para mais informações, consulte o artigo Conceda uma autorização de fluxo de trabalho para aceder a Google Cloud recursos.Crie um fluxo de trabalho de orquestração e use o seguinte código fonte YAML como definição do fluxo de trabalho:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Substitua o seguinte:
- PROJECT_ID: o ID do seu projeto Google Cloud .
- REPOSITORY_LOCATION: a localização do seu repositório do Dataform.
- REPOSITORY_ID: o nome do seu repositório do Dataform.
- GIT_COMMITISH: o ramo do Git a partir do qual quer executar o código do Dataform. Para um repositório criado recentemente, substitua por
main
.
Agende o fluxo de trabalho de orquestração com o Cloud Scheduler.
Personalize o pedido de resultado da compilação de criação do fluxo de trabalho do Dataform
Pode atualizar o fluxo de trabalho de orquestração existente e definir as definições do pedido de resultado da compilação de criação do fluxo de trabalho do Dataform no formato YAML. Para mais informações sobre as definições,
consulte a
projects.locations.repositories.compilationResults
referência de recursos REST.
Por exemplo, para adicionar uma definição _dev
schemaSuffix
a todas as ações durante a compilação, substitua o corpo do passo createCompilationResult
pelo seguinte fragmento do código:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
Também pode transmitir definições adicionais como argumentos de tempo de execução num pedido de execução de fluxos de trabalho e aceder a esses argumentos através de variáveis. Para mais informações, consulte o artigo Transmita argumentos de tempo de execução num pedido de execução.
Personalize o pedido de invocação do fluxo de trabalho do Dataform
Pode atualizar o fluxo de trabalho de orquestração existente e definir as definições do pedido de invocação do fluxo de trabalho do Dataform no formato YAML. Para mais informações sobre as definições do pedido de invocação,
consulte a
projects.locations.repositories.workflowInvocations
referência de recursos REST.
Por exemplo, para executar apenas ações com a etiqueta hourly
com todas as dependências transitivas incluídas, substitua o corpo createWorkflowInvocation
pelo seguinte fragmento do código:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
Também pode transmitir definições adicionais como argumentos de tempo de execução num pedido de execução de fluxos de trabalho e aceder a esses argumentos através de variáveis. Para mais informações, consulte o artigo Transmita argumentos de tempo de execução num pedido de execução.
Agende execuções com o Cloud Composer
Pode usar o Cloud Composer 2 para agendar execuções do Dataform. O Dataform não suporta o Cloud Composer 1.
Para gerir agendamentos de execuções do Dataform com o Cloud Composer 2, pode usar operadores do Dataform em gráficos acíclicos dirigidos (DAGs) do Airflow. Pode criar um DAG do Airflow que agende invocações do fluxo de trabalho do Dataform.
O Dataform fornece vários operadores do Airflow. Estes incluem operadores para obter um resultado de compilação, obter uma invocação de fluxo de trabalho e cancelar uma invocação de fluxo de trabalho. Para ver a lista completa de operadores do Dataform Airflow disponíveis, consulte os operadores do Google Dataform.
Instale o pacote google-cloud-dataform
PyPi
Se usar as versões 2.0.25
e posteriores do Cloud Composer 2, este pacote
está pré-instalado no seu ambiente. Não precisa de o instalar.
Se usar versões anteriores do Cloud Composer 2,
instale o google-cloud-dataform
pacote PyPi.
Na secção de pacotes PyPI, especifique a versão ==0.2.0
.
Crie um DAG do Airflow que agende invocações do fluxo de trabalho do Dataform
Para gerir execuções agendadas de fluxos de trabalho do Dataform com o Cloud Composer 2, escreva o DAG usando os operadores do Dataform Airflow, depois carregue-o para o contentor do seu ambiente.
O exemplo de código seguinte mostra um DAG do Airflow que cria um resultado de compilação do Dataform e inicia uma invocação do fluxo de trabalho do Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Substitua o seguinte:
- PROJECT_ID: o ID do projeto do Dataform Google Cloud .
- REPOSITORY_ID: o nome do seu repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- COMPILATION_RESULT: o nome do resultado da compilação que quer usar para esta invocação do fluxo de trabalho.
- GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que quer usar, por exemplo, um ramo ou um SHA do Git.
O seguinte exemplo de código mostra um DAG do Airflow que faz o seguinte:
- Cria um resultado da compilação do Dataform.
- Inicia uma invocação de fluxo de trabalho do Dataform assíncrona.
- Consulta o estado do fluxo de trabalho até entrar no estado esperado
usando
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Substitua o seguinte:
- PROJECT_ID: o seu Google Cloud projectID do Dataform.
- REPOSITORY_ID: o nome do seu repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- COMPILATION_RESULT: o nome do resultado da compilação que quer usar para esta invocação do fluxo de trabalho.
- GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que quer usar, por exemplo, um ramo ou um SHA do Git.
- COMPILATION_RESULT: o nome do resultado da compilação que quer usar para esta invocação do fluxo de trabalho.
Adicione parâmetros de configuração de compilação
Pode adicionar parâmetros de configuração de compilação adicionais ao objeto DAG do Airflow.create_compilation_result
Para mais informações sobre os parâmetros disponíveis, consulte a CodeCompilationConfig
referência da API Dataform.
Para adicionar parâmetros de configuração de compilação ao objeto DAG do Airflow, adicione os parâmetros selecionados ao campo no seguinte formato:
create_compilation_result
code_compilation_config
create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Substitua o seguinte:
- PROJECT_ID: o ID do projeto do Dataform Google Cloud .
- REPOSITORY_ID: o nome do seu repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que quer usar, por exemplo, um ramo ou um SHA do Git.
- PARAMETER: o parâmetro
CodeCompilationConfig
selecionado. Pode adicionar vários parâmetros. - PARAMETER_VALUE: o valor do parâmetro selecionado.
O seguinte exemplo de código mostra o parâmetro defaultDatabase
adicionado ao objeto DAG do create_compilation_result
Airflow:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Adicione parâmetros de configuração de invocação do fluxo de trabalho
Pode adicionar parâmetros de configuração de invocação do fluxo de trabalho adicionais ao objeto DAG do Airflow.create_workflow_invocation
Para mais informações sobre os parâmetros disponíveis, consulte a InvocationConfig
referência da API Dataform.
Para adicionar parâmetros de configuração de invocação do fluxo de trabalho ao objeto DAG do Airflow, adicione os parâmetros selecionados ao campo
invocation_config
no seguinte formato:create_workflow_invocation
create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Substitua o seguinte:
- PROJECT_ID: o ID do projeto do Dataform Google Cloud .
- REPOSITORY_ID: o nome do seu repositório do Dataform.
- REGION: a região em que o repositório do Dataform está localizado.
- PARAMETER: o parâmetro
InvocationConfig
selecionado. Pode adicionar vários parâmetros. - PARAMETER_VALUE: o valor do parâmetro selecionado.
O seguinte exemplo de código mostra os parâmetros includedTags[]
e transitiveDependenciesIncluded
adicionados ao objeto DAG do Airflow:create_workflow_invocation
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
O que se segue?
- Para saber como configurar as configurações de lançamento de compilação do Dataform, consulte o artigo Crie uma configuração de lançamento.
- Para saber mais sobre o ciclo de vida do código do Dataform, consulte o artigo Introdução ao ciclo de vida do código no Dataform.
- Para saber mais sobre a API Dataform, consulte o artigo API Dataform.
- Para saber mais sobre os ambientes do Cloud Composer, consulte o artigo Vista geral do Cloud Composer.
- Para saber mais sobre os preços do Workflows, consulte o artigo Preços do Workflows.