Programar execuções

Este documento mostra como fazer o seguinte no Dataform:

Antes de começar

Para programar execuções com configurações de fluxo de trabalho ou programar execuções com fluxos de trabalho e o Cloud Scheduler, faça o seguinte:

  1. No Google Cloud console, acesse a página Dataform.

    Acesse o Dataform

  2. Selecione ou crie um repositório.

  3. Crie uma configuração de versão.

Para programar execuções com o Cloud Composer, faça o seguinte:

  1. Selecione ou crie um repositório do Dataform.
  2. Conceder acesso ao Dataform ao BigQuery.
  3. Selecione ou crie um espaço de trabalho do Dataform.
  4. Crie pelo menos uma tabela.
  5. Crie um ambiente do Cloud Composer 2.

Funções exigidas

Para receber as permissões necessárias para concluir as tarefas neste documento, peça ao administrador para conceder a você os seguintes papéis do IAM:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias usando papéis personalizados ou outros papéis predefinidos.

Para usar uma conta de serviço diferente da conta de serviço padrão do Dataform, conceda acesso à conta de serviço personalizada.

Para ativar as execuções programadas de uma configuração de fluxo de trabalho quando o modo estrito de agir como está ativado, conceda a permissão iam.serviceAccounts.actAs à conta de serviço do Dataform para a conta de serviço usada na configuração do fluxo de trabalho. Essa permissão está disponível no papel de Usuário da conta de serviço (roles/iam.serviceAccountUser).

Para usar as credenciais de usuário da Conta do Google ao criar uma configuração de fluxo de trabalho (Prévia), conceda acesso à Conta do Google.

Programar execuções com configurações de fluxo de trabalho

Nesta seção, mostramos como criar uma configuração de fluxo de trabalho no Dataform para programar e configurar execuções de fluxo de trabalho. É possível usar configurações de fluxo de trabalho para executar fluxos de trabalho do Dataform de acordo com uma programação.

Sobre as configurações de fluxo de trabalho

Para programar execuções do Dataform de todas ou de algumas ações de fluxo de trabalho no BigQuery, crie configurações de fluxo de trabalho. Em uma configuração de fluxo de trabalho, você seleciona uma configuração de lançamento de compilação, escolhe ações de fluxo de trabalho para execução e define a programação de execução.

Em seguida, durante uma execução programada da configuração do fluxo de trabalho, o Dataform implanta sua seleção de ações do resultado da compilação mais recente na configuração de lançamento para o BigQuery. Também é possível acionar manualmente a execução de uma configuração de fluxo de trabalho com a API Dataform workflowConfigs.

Uma configuração de fluxo de trabalho do Dataform contém as seguintes configurações de execução:

  • ID da configuração do fluxo de trabalho.
  • Configuração de lançamento.
  • Conta de serviço.

    Essa é a conta de serviço associada à configuração do fluxo de trabalho. Você pode selecionar a conta de serviço padrão do Dataform ou uma conta associada ao seu projeto do Google Cloud . Também é possível inserir manualmente uma conta de serviço diferente. Por padrão, as configurações de fluxo de trabalho usam as mesmas contas de serviço que os repositórios.

    As credenciais da conta de serviço são o método de autorização padrão para criação e execução de configurações de fluxo de trabalho programado.

  • Credenciais de usuário da Conta do Google (prévia)

    As credenciais de usuário da Conta do Google são o método de autorização padrão para criações e execuções de configuração de fluxo de trabalho manuais e não programadas. Para mais informações, consulte Autorizar sua Conta do Google.

  • Ações do fluxo de trabalho a serem executadas:

    • Todas as ações.
    • Seleção de ações.
    • Seleção de tags.
  • Programação de execução e fuso horário.

Criar uma configuração de fluxo de trabalho

Para criar uma configuração de fluxo de trabalho do Dataform, siga estas etapas:

  1. No repositório, acesse Lançamentos e programação.
  2. Na seção Configurações do fluxo de trabalho, clique em Criar.
  3. No painel Criar configuração do fluxo de trabalho, no campo ID da configuração, insira um ID exclusivo para a configuração do fluxo de trabalho.

    Os IDs podem incluir apenas números, letras, hifens e sublinhados.

  4. No menu Configuração da versão, selecione uma configuração de versão de compilação.

  5. Na seção Autenticação, autorize a configuração do fluxo de trabalho com as credenciais de usuário da sua Conta do Google ou uma conta de serviço.

    • Para usar as credenciais de usuário da sua Conta do Google (Prévia), selecione Executar com minhas credenciais de usuário.
    • Para usar uma conta de serviço, selecione Executar com a conta de serviço selecionada e escolha a conta de serviço padrão do Dataform ou qualquer conta de serviço associada ao seu projetoGoogle Cloud a que você tem acesso. Se você não selecionar uma conta de serviço, a configuração do fluxo de trabalho usará a conta de serviço do repositório.
  6. Opcional: no campo Frequência de programação, insira a frequência de execuções no formato unix-cron.

    Para verificar se o Dataform executa o resultado da compilação mais recente na configuração de lançamento correspondente, mantenha um intervalo mínimo de uma hora entre o momento da criação do resultado da compilação e o momento da execução programada.

  7. Opcional: no menu Fuso horário, selecione o fuso horário das execuções.

    O fuso horário padrão é UTC.

  8. Selecione as ações do fluxo de trabalho a serem executadas:

    • Para executar todo o fluxo de trabalho, clique em Todas as ações.
    • Para executar as ações selecionadas no fluxo de trabalho, clique em Seleção de ações e escolha as ações.
    • Para executar ações com as tags selecionadas, clique em Seleção de tags e escolha as tags.
    • Opcional: para executar as ações ou tags selecionadas e as dependências delas, marque a opção Incluir dependências.
    • Opcional: para executar as ações ou tags selecionadas e os dependentes delas, selecione a opção Incluir dependentes.
    • Opcional: para recriar todas as tabelas do zero, selecione a opção Executar com atualização completa.

    Sem essa opção, o Dataform atualiza as tabelas incrementais sem recriá-las do zero.

  9. Clique em Criar. Se você selecionou Executar com minhas credenciais de usuário como método de autenticação, autorize sua Conta do Google (Prévia).

Por exemplo, a configuração de fluxo de trabalho a seguir executa ações com a tag hourly a cada hora no fuso horário CEST:

  • ID da configuração: production-hourly
  • Configuração da versão: -
  • Frequência: 0 * * * *
  • Fuso horário: Central European Summer Time (CEST)
  • Seleção de ações do fluxo de trabalho: seleção de tags, tag hourly

Autorizar sua Conta do Google

Para autenticar o recurso com as credenciais de usuário da sua Conta do Google, conceda permissão manualmente para que os pipelines do BigQuery recebam o token de acesso da sua Conta do Google e acessem os dados de origem em seu nome. É possível conceder aprovação manual com a interface da caixa de diálogo do OAuth.

Você só precisa conceder permissão aos pipelines do BigQuery uma vez.

Para revogar a permissão concedida, siga estas etapas:

  1. Acesse a página da sua Conta do Google.
  2. Clique em Pipelines do BigQuery.
  3. Clique em Remover acesso.

Alterar o proprietário da configuração do fluxo de trabalho atualizando as credenciais também requer aprovação manual se o novo proprietário da Conta do Google nunca tiver criado uma configuração de fluxo de trabalho antes.

Editar uma configuração de fluxo de trabalho

Para editar uma configuração de fluxo de trabalho, siga estas etapas:

  1. No repositório, acesse Lançamentos e programação.
  2. Na configuração do fluxo de trabalho que você quer editar, clique no menu Mais e em Editar.
  3. No painel Editar configuração do fluxo de trabalho, edite as configurações da configuração de versão e clique em Salvar.

Excluir uma configuração de fluxo de trabalho

Para excluir uma configuração de fluxo de trabalho, siga estas etapas:

  1. No repositório, acesse Lançamentos e programação.
  2. Na configuração do fluxo de trabalho que você quer excluir, clique no menu Mais e em Excluir.
  3. Na caixa de diálogo Excluir configuração da versão, clique em Excluir.

Programar execuções com o Workflows e o Cloud Scheduler

Nesta seção, mostramos como programar execuções de fluxos de trabalho do Dataform usando o Workflows e o Cloud Scheduler.

Sobre execuções de fluxo de trabalho programadas

É possível definir a frequência das execuções do fluxo de trabalho do Dataform criando um job do Cloud Scheduler que aciona um fluxo de trabalho do Workflows. O Workflows executa serviços em um fluxo de trabalho de orquestração definido por você.

O Workflows executa seu fluxo de trabalho do Dataform em um processo de duas etapas. Primeiro, ele extrai o código do repositório do Dataform do seu provedor Git e o compila em um resultado de compilação. Em seguida, ele usa o resultado da compilação para criar um fluxo de trabalho do Dataform e o executa na frequência definida.

Criar um fluxo de trabalho de orquestração programado

Para programar execuções do fluxo de trabalho do Dataform, use o Workflows para criar um fluxo de trabalho de orquestração e adicione um job do Cloud Scheduler como um gatilho.

  1. O Workflows usa contas de serviço para dar acesso aos fluxos de trabalho aos recursos do Google Cloud . Crie uma conta de serviço e conceda a ela o papel de Editor do Dataform (roles/dataform.editor) do Identity and Access Management, além das permissões mínimas necessárias para gerenciar seu fluxo de trabalho de orquestração. Para mais informações, consulte Conceder permissão a um fluxo de trabalho para acessar recursos do Google Cloud .

  2. Crie um fluxo de trabalho de orquestração e use o seguinte código-fonte YAML como definição do fluxo de trabalho:

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    Substitua:

    • PROJECT_ID: o ID do seu projeto do Google Cloud .
    • REPOSITORY_LOCATION: o local do repositório do Dataform.
    • REPOSITORY_ID: o nome do seu repositório do Dataform.
    • GIT_COMMITISH: a ramificação do Git em que você quer executar o código do Dataform. Para um repositório recém-criado, substitua por main.
  3. Programe o fluxo de trabalho de orquestração usando o Cloud Scheduler.

Personalizar a solicitação de resultado da compilação de criação do fluxo de trabalho do Dataform

É possível atualizar o fluxo de trabalho de orquestração atual e definir as configurações de solicitação de resultado da compilação de criação do fluxo de trabalho do Dataform no formato YAML. Para mais informações sobre as configurações, consulte a referência do recurso REST projects.locations.repositories.compilationResults.

Por exemplo, para adicionar uma configuração _dev schemaSuffix a todas as ações durante a compilação, substitua o corpo da etapa createCompilationResult pelo seguinte snippet de código:

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

Também é possível transmitir outras configurações como argumentos de ambiente de execução em uma solicitação de execução do Workflows e acessar esses argumentos usando variáveis. Para mais informações, consulte Transmitir argumentos de ambiente de execução em uma solicitação de execução.

Personalizar a solicitação de invocação do fluxo de trabalho do Dataform

É possível atualizar o fluxo de trabalho de orquestração atual e definir as configurações de solicitação de invocação do fluxo de trabalho do Dataform no formato YAML. Para mais informações sobre as configurações de solicitação de invocação, consulte a referência do recurso REST projects.locations.repositories.workflowInvocations.

Por exemplo, para executar apenas ações com a tag hourly com todas as dependências transitivas incluídas, substitua o corpo createWorkflowInvocation pelo seguinte snippet de código:

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

Também é possível transmitir outras configurações como argumentos de ambiente de execução em uma solicitação de execução do Workflows e acessar esses argumentos usando variáveis. Para mais informações, consulte Transmitir argumentos de ambiente de execução em uma solicitação de execução.

Programar execuções com o Cloud Composer

Use o Cloud Composer 2 para programar execuções do Dataform. O Dataform não é compatível com o Cloud Composer 1.

Para gerenciar programações de execuções do Dataform com o Cloud Composer 2, use os operadores do Dataform em gráficos acíclicos dirigidos (DAGs) do Airflow. É possível criar um DAG do Airflow que programa invocações de fluxo de trabalho do Dataform.

O Dataform oferece vários operadores do Airflow. Isso inclui operadores para receber um resultado de compilação, uma invocação de fluxo de trabalho e cancelar uma invocação de fluxo de trabalho. Para conferir a lista completa de operadores do Dataform Airflow disponíveis, consulte Operadores do Google Dataform.

Instale o pacote google-cloud-dataform do PyPi

Se você usa as versões 2.0.25 e mais recentes do Cloud Composer 2, esse pacote é pré-instalado no seu ambiente. Não é necessário instalar nada.

Se você usa versões anteriores do Cloud Composer 2, instale o pacote google-cloud-dataform do PyPi.

Na seção "Pacotes PyPI", especifique a versão ==0.2.0.

Criar um DAG do Airflow que programa invocações de fluxo de trabalho do Dataform

Para gerenciar execuções programadas de fluxos de trabalho do Dataform com o Cloud Composer 2, escreva o DAG usando os operadores do Dataform Airflow e faça upload dele para o bucket do seu ambiente.

O exemplo de código a seguir mostra um DAG do Airflow que cria um resultado de compilação do Dataform e inicia uma invocação de fluxo de trabalho do Dataform:

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

Substitua:

  • PROJECT_ID: o ID do projeto do Dataform Google Cloud .
  • REPOSITORY_ID: o nome do seu repositório do Dataform.
  • REGION: a região em que o repositório do Dataform está localizado.
  • COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.
  • GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.

O exemplo de código a seguir mostra um DAG do Airflow que realiza as seguintes ações:

  1. Cria um resultado de compilação do Dataform.
  2. Inicia uma invocação assíncrona do fluxo de trabalho do Dataform.
  3. Pesquisa o status do fluxo de trabalho até que ele entre no estado esperado usando DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Substitua:

  • PROJECT_ID: seu Dataform Google Cloud projectID.
  • REPOSITORY_ID: o nome do seu repositório do Dataform.
  • REGION: a região em que o repositório do Dataform está localizado.
  • COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.
  • GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.
  • COMPILATION_RESULT: o nome do resultado da compilação que você quer usar para essa invocação de fluxo de trabalho.

Adicionar parâmetros de configuração de compilação

É possível adicionar outros parâmetros de configuração de compilação ao objeto DAG do Airflow create_compilation_result. Para mais informações sobre os parâmetros disponíveis, consulte a referência da API Dataform CodeCompilationConfig.

  • Para adicionar parâmetros de configuração de compilação ao objeto DAG do Airflow create_compilation_result, adicione os parâmetros selecionados ao campo code_compilation_config no seguinte formato:

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    Substitua:

    • PROJECT_ID: o ID do projeto do Dataform Google Cloud .
    • REPOSITORY_ID: o nome do seu repositório do Dataform.
    • REGION: a região em que o repositório do Dataform está localizado.
    • GIT_COMMITISH: o commitish do Git no repositório Git remoto da versão do código que você quer usar, por exemplo, uma ramificação ou um SHA do Git.
    • PARAMETER: o parâmetro CodeCompilationConfig selecionado. É possível adicionar vários parâmetros.
    • PARAMETER_VALUE: o valor do parâmetro selecionado.

O exemplo de código a seguir mostra o parâmetro defaultDatabase adicionado ao objeto DAG do Airflow create_compilation_result:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Adicionar parâmetros de configuração de invocação do fluxo de trabalho

É possível adicionar outros parâmetros de configuração de invocação de fluxo de trabalho ao objeto DAG do Airflow create_workflow_invocation. Para mais informações sobre os parâmetros disponíveis, consulte a referência da API Dataform InvocationConfig.

  • Para adicionar parâmetros de configuração de invocação de fluxo de trabalho ao objeto DAG do Airflow create_workflow_invocation, adicione os parâmetros selecionados ao campo invocation_config no seguinte formato:

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    Substitua:

    • PROJECT_ID: o ID do projeto do Dataform Google Cloud .
    • REPOSITORY_ID: o nome do seu repositório do Dataform.
    • REGION: a região em que o repositório do Dataform está localizado.
    • PARAMETER: o parâmetro InvocationConfig selecionado. É possível adicionar vários parâmetros.
    • PARAMETER_VALUE: o valor do parâmetro selecionado.

O exemplo de código a seguir mostra os parâmetros includedTags[] e transitiveDependenciesIncluded adicionados ao objeto DAG do Airflow create_workflow_invocation:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

A seguir