Agende execuções

Este documento mostra como fazer o seguinte no Dataform:

Antes de começar

Para programar execuções com configurações de fluxo de trabalho ou programar execuções com fluxos de trabalho e o Cloud Scheduler, faça o seguinte:

  1. Na Google Cloud consola, aceda à página Dataform.

    Aceder ao Dataform

  2. Selecione ou crie um repositório.

  3. Crie uma configuração de lançamento.

Para programar execuções com o Cloud Composer, faça o seguinte:

  1. Selecione ou crie um repositório do Dataform.
  2. Conceda acesso do Dataform ao BigQuery.
  3. Selecione ou crie um espaço de trabalho do Dataform.
  4. Crie, pelo menos, uma tabela.
  5. Crie um ambiente do Cloud Composer 2.

Funções necessárias

Para receber as autorizações de que precisa para concluir as tarefas neste documento, peça ao seu administrador que lhe conceda as seguintes funções da IAM:

Para mais informações sobre a atribuição de funções, consulte o artigo Faça a gestão do acesso a projetos, pastas e organizações.

Também pode conseguir as autorizações necessárias através de funções personalizadas ou outras funções predefinidas.

Para usar uma conta de serviço diferente da conta de serviço do Dataform predefinida, conceda acesso à conta de serviço personalizada.

Para ativar as execuções agendadas de uma configuração de fluxo de trabalho quando o modo de atuação rigoroso está ativado, tem de conceder a autorização iam.serviceAccounts.actAs à conta de serviço do Dataform para a conta de serviço usada na configuração do fluxo de trabalho. Esta autorização está disponível na função de utilizador da conta de serviço (roles/iam.serviceAccountUser).

Para usar as credenciais de utilizador da Conta Google ao criar uma configuração do fluxo de trabalho (pré-visualização), conceda acesso à Conta Google.

Agende execuções com configurações de fluxo de trabalho

Esta secção mostra como criar uma configuração de fluxo de trabalho no Dataform para agendar e configurar execuções de fluxos de trabalho. Pode usar configurações de fluxo de trabalho para executar fluxos de trabalho do Dataform de acordo com um horário.

Acerca das configurações de fluxos de trabalho

Para agendar execuções do Dataform de todas ou de ações selecionadas do fluxo de trabalho no BigQuery, pode criar configurações de fluxo de trabalho. Numa configuração do fluxo de trabalho, seleciona uma configuração de lançamento de compilação, seleciona ações do fluxo de trabalho para execução e define a programação de execução.

Em seguida, durante uma execução agendada da configuração do fluxo de trabalho, o Dataform implementa a sua seleção de ações a partir do resultado da compilação mais recente na configuração de lançamento para o BigQuery. Também pode acionar manualmente a execução de uma configuração de fluxo de trabalho com a API Dataform workflowConfigs.

Uma configuração do fluxo de trabalho do Dataform contém as seguintes definições de execução:

  • ID da configuração do fluxo de trabalho.
  • Configuração de lançamento.
  • Conta de serviço.

    Esta é a conta de serviço associada à configuração do fluxo de trabalho. Pode selecionar a conta de serviço do Dataform predefinida ou uma conta de serviço associada ao seu Google Cloud projeto, ou pode introduzir manualmente uma conta de serviço diferente. Por predefinição, as configurações do fluxo de trabalho usam as mesmas contas de serviço que os respetivos repositórios.

    As credenciais da conta de serviço são o método de autorização predefinido para a criação e as execuções de configurações de fluxo de trabalho agendadas.

  • Credenciais de utilizador da Conta Google (pré-visualização)

    As credenciais de utilizador da Conta Google são o método de autorização predefinido para a criação e as execuções de configuração de fluxo de trabalho manuais e não agendadas. Para mais informações, consulte o artigo Autorize a sua Conta Google.

  • Ações do fluxo de trabalho a executar:

    • Todas as ações.
    • Seleção de ações.
    • Seleção de etiquetas.
  • Programação da execução e fuso horário.

Crie uma configuração do fluxo de trabalho

Para criar uma configuração de fluxo de trabalho do Dataform, siga estes passos:

  1. No repositório, aceda a Lançamentos e agendamento.
  2. Na secção Configurações do fluxo de trabalho, clique em Criar.
  3. No painel Criar configuração do fluxo de trabalho, no campo ID da configuração, introduza um ID exclusivo para a configuração do fluxo de trabalho.

    Os IDs só podem incluir números, letras, hífenes e sublinhados.

  4. No menu Configuração de lançamento, selecione uma configuração de lançamento de compilação.

  5. Na secção Autenticação, autorize a configuração do fluxo de trabalho com as credenciais de utilizador da sua Conta Google ou uma conta de serviço.

    • Para usar as credenciais de utilizador da sua Conta Google (Pré-visualização), selecione Executar com as minhas credenciais de utilizador.
    • Para usar uma conta de serviço, selecione Executar com a conta de serviço selecionada e, de seguida, selecione a conta de serviço do Dataform predefinida ou qualquer conta de serviço associada ao seu Google Cloud projeto ao qual tem acesso. Se não selecionar uma conta de serviço, a configuração do fluxo de trabalho usa a conta de serviço do repositório.
  6. Opcional: no campo Frequência da programação, introduza a frequência de execuções no formato unix-cron.

    Para verificar se o Dataform executa o resultado da compilação mais recente na configuração de lançamento correspondente, mantenha um intervalo mínimo de uma hora entre a hora de criação do resultado da compilação e a hora de execução agendada.

  7. Opcional: no menu Fuso horário, selecione o fuso horário para as execuções.

    O fuso horário predefinido é UTC.

  8. Selecione as ações do fluxo de trabalho a executar:

    • Para executar todo o fluxo de trabalho, clique em Todas as ações.
    • Para executar ações selecionadas no fluxo de trabalho, clique em Seleção de ações e, de seguida, selecione ações.
    • Para executar ações com etiquetas selecionadas, clique em Seleção de etiquetas e, em seguida, selecione etiquetas.
    • Opcional: para executar ações ou etiquetas selecionadas e as respetivas dependências, selecione a opção Incluir dependências.
    • Opcional: para executar ações ou etiquetas selecionadas e os respetivos dependentes, selecione a opção Incluir dependentes.
    • Opcional: para reconstruir todas as tabelas de raiz, selecione a opção Executar com atualização completa.

    Sem esta opção, o Dataform atualiza as tabelas incrementais sem as reconstruir de raiz.

  9. Clique em Criar. Se selecionou Executar com as minhas credenciais de utilizador para o seu método de autenticação, tem de autorizar a sua Conta Google (Pré-visualização).

Por exemplo, a seguinte configuração do fluxo de trabalho executa ações com a etiqueta hourly a cada hora no fuso horário CEST:

  • ID da configuração: production-hourly
  • Configuração do lançamento: -
  • Frequência: 0 * * * *
  • Fuso horário: Central European Summer Time (CEST)
  • Seleção de ações do fluxo de trabalho: seleção de etiquetas, etiqueta hourly

Autorize a sua Conta Google

Para autenticar o recurso com as credenciais de utilizador da sua Conta Google, tem de conceder manualmente autorização aos pipelines do BigQuery para obterem a chave de acesso da sua Conta Google e acederem aos dados de origem em seu nome. Pode conceder aprovação manual com a interface da caixa de diálogo do OAuth.

Só tem de conceder autorização aos pipelines do BigQuery uma vez.

Para revogar a autorização que concedeu, siga estes passos:

  1. Aceda à página da Conta Google.
  2. Clique em Pipelines do BigQuery.
  3. Clique em Remover acesso.

A alteração do proprietário da configuração do fluxo de trabalho através da atualização das credenciais também requer aprovação manual se o novo proprietário da Conta Google nunca tiver criado uma configuração do fluxo de trabalho antes.

Edite uma configuração de fluxo de trabalho

Para editar uma configuração de fluxo de trabalho, siga estes passos:

  1. No repositório, aceda a Lançamentos e agendamento.
  2. Na configuração do fluxo de trabalho que quer editar, clique no menu Mais e, de seguida, em Editar.
  3. No painel Editar configuração do fluxo de trabalho, edite as definições de configuração de lançamento e, de seguida, clique em Guardar.

Elimine uma configuração de fluxo de trabalho

Para eliminar uma configuração de fluxo de trabalho, siga estes passos:

  1. No repositório, aceda a Lançamentos e agendamento.
  2. Junto à configuração do fluxo de trabalho que quer eliminar, clique no menu Mais e, de seguida, clique em Eliminar.
  3. Na caixa de diálogo Eliminar configuração de lançamento, clique em Eliminar.

Agende execuções com fluxos de trabalho e o Cloud Scheduler

Esta secção mostra como agendar execuções de fluxos de trabalho do Dataform usando fluxos de trabalho e o Cloud Scheduler.

Acerca das execuções de fluxos de trabalho agendadas

Pode definir a frequência das execuções do fluxo de trabalho do Dataform através da criação de uma tarefa do Cloud Scheduler que aciona um fluxo de trabalho do Workflows. Os fluxos de trabalho executam serviços num fluxo de trabalho de orquestração que define.

Os fluxos de trabalho executam o seu fluxo de trabalho do Dataform num processo de dois passos. Primeiro, extrai o código do repositório do Dataform do seu fornecedor de Git e compila-o num resultado de compilação. Em seguida, usa o resultado da compilação para criar um fluxo de trabalho do Dataform e executá-lo na frequência que definir.

Crie um fluxo de trabalho de orquestração agendado

Para agendar execuções do fluxo de trabalho do Dataform, use os fluxos de trabalho para criar um fluxo de trabalho de orquestração e adicione uma tarefa do Cloud Scheduler como acionador.

  1. Os fluxos de trabalho usam contas de serviço para dar aos fluxos de trabalho acesso aGoogle Cloud recursos. Crie uma conta de serviço e conceda-lhe a função de gestão de identidade e de acesso (IAM) de editor do Dataform (roles/dataform.editor), bem como as autorizações mínimas necessárias para gerir o seu fluxo de trabalho de orquestração. Para mais informações, consulte o artigo Conceda uma autorização de fluxo de trabalho para aceder a Google Cloud recursos.

  2. Crie um fluxo de trabalho de orquestração e use o seguinte código fonte YAML como definição do fluxo de trabalho:

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    Substitua o seguinte:

    • PROJECT_ID: o ID do seu projeto Google Cloud .
    • REPOSITORY_LOCATION: a localização do seu repositório do Dataform.
    • REPOSITORY_ID: o nome do seu repositório do Dataform.
    • GIT_COMMITISH: o ramo do Git a partir do qual quer executar o código do Dataform. Para um repositório criado recentemente, substitua por main.
  3. Agende o fluxo de trabalho de orquestração com o Cloud Scheduler.

Personalize o pedido de resultado da compilação de criação do fluxo de trabalho do Dataform

Pode atualizar o fluxo de trabalho de orquestração existente e definir as definições do pedido de resultado da compilação de criação do fluxo de trabalho do Dataform no formato YAML. Para mais informações sobre as definições, consulte a projects.locations.repositories.compilationResults referência de recursos REST.

Por exemplo, para adicionar uma definição _dev schemaSuffix a todas as ações durante a compilação, substitua o corpo do passo createCompilationResult pelo seguinte fragmento do código:

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

Também pode transmitir definições adicionais como argumentos de tempo de execução num pedido de execução de fluxos de trabalho e aceder a esses argumentos através de variáveis. Para mais informações, consulte o artigo Transmita argumentos de tempo de execução num pedido de execução.

Personalize o pedido de invocação do fluxo de trabalho do Dataform

Pode atualizar o fluxo de trabalho de orquestração existente e definir as definições do pedido de invocação do fluxo de trabalho do Dataform no formato YAML. Para mais informações sobre as definições do pedido de invocação, consulte a projects.locations.repositories.workflowInvocations referência de recursos REST.

Por exemplo, para executar apenas ações com a etiqueta hourly com todas as dependências transitivas incluídas, substitua o corpo createWorkflowInvocation pelo seguinte fragmento do código:

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

Também pode transmitir definições adicionais como argumentos de tempo de execução num pedido de execução de fluxos de trabalho e aceder a esses argumentos através de variáveis. Para mais informações, consulte o artigo Transmita argumentos de tempo de execução num pedido de execução.

Agende execuções com o Cloud Composer

Pode usar o Cloud Composer 2 para agendar execuções do Dataform. O Dataform não suporta o Cloud Composer 1.

Para gerir agendamentos de execuções do Dataform com o Cloud Composer 2, pode usar operadores do Dataform em gráficos acíclicos dirigidos (DAGs) do Airflow. Pode criar um DAG do Airflow que agende invocações do fluxo de trabalho do Dataform.

O Dataform fornece vários operadores do Airflow. Estes incluem operadores para obter um resultado de compilação, obter uma invocação de fluxo de trabalho e cancelar uma invocação de fluxo de trabalho. Para ver a lista completa de operadores do Dataform Airflow disponíveis, consulte os operadores do Google Dataform.

Instale o pacote google-cloud-dataform PyPi

Se usar as versões 2.0.25 e posteriores do Cloud Composer 2, este pacote está pré-instalado no seu ambiente. Não precisa de o instalar.

Se usar versões anteriores do Cloud Composer 2, instale o google-cloud-dataformpacote PyPi.

Na secção de pacotes PyPI, especifique a versão ==0.2.0.

Crie um DAG do Airflow que agende invocações do fluxo de trabalho do Dataform

Para gerir execuções agendadas de fluxos de trabalho do Dataform com o Cloud Composer 2, escreva o DAG usando os operadores do Dataform Airflow, depois carregue-o para o contentor do seu ambiente.

O exemplo de código seguinte mostra um DAG do Airflow que cria um resultado de compilação do Dataform e inicia uma invocação do fluxo de trabalho do Dataform:

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

Substitua o seguinte:

  • PROJECT_ID: o ID do projeto do Dataform Google Cloud .
  • REPOSITORY_ID: o nome do seu repositório do Dataform.
  • REGION: a região em que o repositório do Dataform está localizado.
  • COMPILATION_RESULT: o nome do resultado da compilação que quer usar para esta invocação do fluxo de trabalho.
  • GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que quer usar, por exemplo, um ramo ou um SHA do Git.

O seguinte exemplo de código mostra um DAG do Airflow que faz o seguinte:

  1. Cria um resultado da compilação do Dataform.
  2. Inicia uma invocação de fluxo de trabalho do Dataform assíncrona.
  3. Consulta o estado do fluxo de trabalho até entrar no estado esperado usando DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Substitua o seguinte:

  • PROJECT_ID: o seu Google Cloud projectID do Dataform.
  • REPOSITORY_ID: o nome do seu repositório do Dataform.
  • REGION: a região em que o repositório do Dataform está localizado.
  • COMPILATION_RESULT: o nome do resultado da compilação que quer usar para esta invocação do fluxo de trabalho.
  • GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que quer usar, por exemplo, um ramo ou um SHA do Git.
  • COMPILATION_RESULT: o nome do resultado da compilação que quer usar para esta invocação do fluxo de trabalho.

Adicione parâmetros de configuração de compilação

Pode adicionar parâmetros de configuração de compilação adicionais ao objeto DAG do Airflow.create_compilation_result Para mais informações sobre os parâmetros disponíveis, consulte a CodeCompilationConfig referência da API Dataform.

  • Para adicionar parâmetros de configuração de compilação ao objeto DAG do Airflow, adicione os parâmetros selecionados ao campo no seguinte formato:create_compilation_resultcode_compilation_config

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto do Dataform Google Cloud .
    • REPOSITORY_ID: o nome do seu repositório do Dataform.
    • REGION: a região em que o repositório do Dataform está localizado.
    • GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que quer usar, por exemplo, um ramo ou um SHA do Git.
    • PARAMETER: o parâmetro CodeCompilationConfig selecionado. Pode adicionar vários parâmetros.
    • PARAMETER_VALUE: o valor do parâmetro selecionado.

O seguinte exemplo de código mostra o parâmetro defaultDatabase adicionado ao objeto DAG do create_compilation_result Airflow:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Adicione parâmetros de configuração de invocação do fluxo de trabalho

Pode adicionar parâmetros de configuração de invocação do fluxo de trabalho adicionais ao objeto DAG do Airflow.create_workflow_invocation Para mais informações sobre os parâmetros disponíveis, consulte a InvocationConfig referência da API Dataform.

  • Para adicionar parâmetros de configuração de invocação do fluxo de trabalho ao objeto DAG do Airflow, adicione os parâmetros selecionados ao campo invocation_config no seguinte formato:create_workflow_invocation

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    Substitua o seguinte:

    • PROJECT_ID: o ID do projeto do Dataform Google Cloud .
    • REPOSITORY_ID: o nome do seu repositório do Dataform.
    • REGION: a região em que o repositório do Dataform está localizado.
    • PARAMETER: o parâmetro InvocationConfig selecionado. Pode adicionar vários parâmetros.
    • PARAMETER_VALUE: o valor do parâmetro selecionado.

O seguinte exemplo de código mostra os parâmetros includedTags[] e transitiveDependenciesIncluded adicionados ao objeto DAG do Airflow:create_workflow_invocation

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

O que se segue?