Pianificare le esecuzioni

Questo documento mostra come eseguire le seguenti operazioni in Dataform:

Prima di iniziare

Per pianificare le esecuzioni con le configurazioni dei flussi di lavoro o pianificare le esecuzioni con i flussi di lavoro e Cloud Scheduler, procedi nel seguente modo:

  1. Nella Google Cloud console, vai alla pagina Dataform.

    Vai a Dataform

  2. Seleziona o crea un repository.

  3. Crea una configurazione della release.

Per pianificare le esecuzioni con Cloud Composer, svolgi i seguenti passaggi:

  1. Seleziona o crea un repository Dataform.
  2. Concedi a Dataform l'accesso a BigQuery.
  3. Seleziona o crea uno spazio di lavoro Dataform.
  4. Crea almeno una tabella.
  5. Crea un ambiente Cloud Composer 2.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per completare le attività descritte in questo documento, chiedi all'amministratore di concederti i seguenti ruoli IAM:

Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Per utilizzare un account di servizio diverso da quello predefinito di Dataform, concedi l'accesso all'account di servizio personalizzato.

Per attivare le esecuzioni pianificate per una configurazione del flusso di lavoro quando è attivata la modalità di sostituzione rigorosa, devi concedere l'autorizzazione iam.serviceAccounts.actAs all'account di servizio Dataform per l'account di servizio utilizzato nella configurazione del flusso di lavoro. Questa autorizzazione è disponibile nel ruolo Utente account di servizio (roles/iam.serviceAccountUser).

Per utilizzare le credenziali utente dell'Account Google durante la creazione di una configurazione del flusso di lavoro (Anteprima), concedi l'accesso all'Account Google.

Pianificare le esecuzioni con le configurazioni del flusso di lavoro

Questa sezione mostra come creare una configurazione del flusso di lavoro in Dataform per pianificare e configurare le esecuzioni del flusso di lavoro. Puoi utilizzare le configurazioni del workflow per eseguire i flussi di lavoro Dataform in base a una pianificazione.

Informazioni sulle configurazioni dei flussi di lavoro

Per pianificare le esecuzioni di Dataform di tutte o di alcune azioni di workflow in BigQuery, puoi creare configurazioni di workflow. In una configurazione del workflow, seleziona una configurazione della release di compilazione, le azioni del workflow per l'esecuzione e imposta la pianificazione dell'esecuzione.

Poi, durante un'esecuzione pianificata della configurazione del flusso di lavoro, Dataform esegue il deployment della selezione di azioni dall'ultimo risultato di compilazione nella configurazione della release in BigQuery. Puoi anche attivare manualmente l'esecuzione di una configurazione del flusso di lavoro con la funzione workflowConfigs dell'API Dataform.

Una configurazione del flusso di lavoro Dataform contiene le seguenti impostazioni di esecuzione:

  • ID della configurazione del flusso di lavoro.
  • Configurazione della release.
  • Account di servizio.

    Si tratta dell'account di servizio associato alla configurazione del flusso di lavoro. Puoi selezionare l'account di servizio Dataform predefinito o un account di servizio associato al tuo Google Cloud progetto oppure inserire manualmente un altro account di servizio. Per impostazione predefinita, le configurazioni dei flussi di lavoro utilizzano gli stessi account di servizio dei relativi repositories.

    Le credenziali dell'account di servizio sono il metodo di autorizzazione predefinito per la creazione e le esecuzioni delle configurazioni dei workflow pianificati.

  • Credenziali utente dell'Account Google (anteprima)

    Le credenziali utente dell'Account Google sono il metodo di autorizzazione predefinito per la creazione ed esecuzione di configurazioni di flussi di lavoro manuali e non pianificati. Per maggiori informazioni, vedi Autorizzare l'Account Google.

  • Azioni del flusso di lavoro da eseguire:

    • Tutte le azioni.
    • Selezione di azioni.
    • Selezione di tag.
  • Pianificazione delle esecuzioni e fuso orario.

Crea una configurazione di flusso di lavoro

Per creare una configurazione del flusso di lavoro di Dataform:

  1. Nel repository, vai a Uscite e pianificazione.
  2. Nella sezione Configurazioni dei flussi di lavoro, fai clic su Crea.
  3. Nel riquadro Crea configurazione del flusso di lavoro, inserisci un ID univoco per la configurazione del flusso di lavoro nel campo ID configurazione.

    Gli ID possono includere solo numeri, lettere, trattini e trattini bassi.

  4. Nel menu Configurazione della release, seleziona una configurazione della release della compilazione.

  5. Nella sezione Autenticazione, autorizza la configurazione del flusso di lavoro con le credenziali utente del tuo Account Google o con un account di servizio.

    • Per utilizzare le credenziali utente del tuo Account Google (Anteprima), seleziona Esegui con le mie credenziali utente.
    • Per utilizzare un account di servizio, seleziona Esegui con l'account di servizio selezionato, quindi seleziona l'account di servizio Dataform predefinito o qualsiasi account di servizio associato al tuo progettoGoogle Cloud a cui hai accesso. Se non selezioni un account di servizio, la configurazione del flusso di lavoro utilizza l'account di servizio del repository.
  6. (Facoltativo) Nel campo Frequenza pianificazione, inserisci la frequenza delle esecuzioni nel formato cron di Unix.

    Per verificare che Dataform esegua l'ultimo risultato di compilazione nella configurazione della release corrispondente, mantieni un'interruzione minima di un'ora tra l'ora di creazione del risultato di compilazione e l'ora di esecuzione pianificata.

  7. (Facoltativo) Nel menu Fuso orario, seleziona il fuso orario per le esecuzioni.

    Il fuso orario predefinito è UTC.

  8. Seleziona le azioni del flusso di lavoro da eseguire:

    • Per eseguire l'intero flusso di lavoro, fai clic su Tutte le azioni.
    • Per eseguire le azioni selezionate nel flusso di lavoro, fai clic su Selezione di azioni e poi seleziona le azioni.
    • Per eseguire azioni con i tag selezionati, fai clic su Selezione di tag e seleziona i tag.
    • (Facoltativo) Per eseguire le azioni o i tag selezionati e le relative dipendenze, seleziona l'opzione Includi dipendenze.
    • (Facoltativo) Per eseguire le azioni o i tag selezionati e i relativi elementi dipendenti, seleziona l'opzione Includi elementi dipendenti.
    • (Facoltativo) Per ricostruire tutte le tabelle da zero, seleziona l'opzione Esegui con aggiornamento completo.

    Senza questa opzione, Dataform aggiorna le tabelle incrementali senza ricostruirle da zero.

  9. Fai clic su Crea. Se hai selezionato Esegui con le mie credenziali utente come metodo di autenticazione, devi autorizzare il tuo Account Google (anteprima).

Ad esempio, la seguente configurazione del flusso di lavoro esegue azioni con il tag hourly ogni ora nel fuso orario CEST:

  • ID configurazione: production-hourly
  • Configurazione della release: -
  • Frequenza: 0 * * * *
  • Fuso orario: Central European Summer Time (CEST)
  • Selezione di azioni di flusso di lavoro: selezione di tag, tag hourly

Autorizzare l'Account Google

Per autenticare la risorsa con le credenziali utente del tuo Account Google, devi concedere manualmente l'autorizzazione alle pipeline BigQuery per recuperare il token di accesso per il tuo Account Google e accedere ai dati di origine per tuo conto. Puoi concedere l'approvazione manuale con l'interfaccia della finestra di dialogo OAuth.

Devi concedere l'autorizzazione alle pipeline BigQuery una sola volta.

Per revocare l'autorizzazione che hai concesso:

  1. Vai alla pagina del tuo Account Google.
  2. Fai clic su Pipeline BigQuery.
  3. Fai clic su Rimuovi accesso.

La modifica del proprietario della configurazione del workflow aggiornando le credenziali richiede anche l'approvazione manuale se il nuovo proprietario dell'Account Google non ha mai creato una configurazione del workflow.

Modificare una configurazione del flusso di lavoro

Per modificare una configurazione del flusso di lavoro:

  1. Nel repository, vai a Uscite e pianificazione.
  2. Accanto alla configurazione del flusso di lavoro che vuoi modificare, fai clic sul menu Altro e poi su Modifica.
  3. Nel riquadro Modifica configurazione del flusso di lavoro, modifica le impostazioni della configurazione della release e poi fai clic su Salva.

Eliminare una configurazione del flusso di lavoro

Per eliminare una configurazione del flusso di lavoro:

  1. Nel repository, vai a Uscite e pianificazione.
  2. Accanto alla configurazione del flusso di lavoro che vuoi eliminare, fai clic sul menu Altro e poi su Elimina.
  3. Nella finestra di dialogo Elimina configurazione della release, fai clic su Elimina.

Pianificare le esecuzioni con Workflows e Cloud Scheduler

Questa sezione mostra come pianificare le esecuzioni dei flussi di lavoro Dataform utilizzando Workflows e Cloud Scheduler.

Informazioni sulle esecuzioni pianificate del flusso di lavoro

Puoi impostare la frequenza di esecuzione dei flussi di lavoro di Dataform creando un job Cloud Scheduler che attivi un flusso di lavoro Workflows. Workflows esegue i servizi in un flusso di lavoro di orchestrazione definito da te.

Workflows esegue il flusso di lavoro Dataform in un procedura in due fasi. Innanzitutto, estrae il codice del repository Dataform dal provider Git e lo compila in un risultato di compilazione. Poi utilizza il risultato della compilazione per creare un flusso di lavoro Dataform e lo esegue con la frequenza impostata.

Creare un flusso di lavoro di orchestrazione pianificato

Per pianificare le esecuzioni del flusso di lavoro Dataform, utilizza Workflows per creare un flusso di lavoro di orchestrazione e aggiungi un job Cloud Scheduler come attivatore.

  1. Workflows utilizza i service account per concedere ai flussi di lavoro l'accesso alle Google Cloud risorse. Crea un account di servizio e concedigli il ruolo di Identity and Access Management Editor di Dataform (roles/dataform.editor) nonché le autorizzazioni minime richieste per gestire il flusso di lavoro di orchestrazione. Per ulteriori informazioni, vedi Concedere l'autorizzazione dei workflow per l'accesso alle Google Cloud risorse.

  2. Crea un flusso di lavoro di orchestrazione e utilizza il seguente codice sorgente YAML come definizione del flusso di lavoro:

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    Sostituisci quanto segue:

    • PROJECT_ID: l'ID del tuo Google Cloud progetto.
    • REPOSITORY_LOCATION: la posizione del repository Dataform.
    • REPOSITORY_ID: il nome del repository Dataform.
    • GIT_COMMITISH: il ramo Git da cui vuoi eseguire il codice Dataform. Per un repository appena creato, sostituisci con main.
  3. Pianifica il flusso di lavoro di orchestrazione utilizzando Cloud Scheduler.

Personalizzare la richiesta di risultato della compilazione della creazione del flusso di lavoro Dataform

Puoi aggiornare il flusso di lavoro di orchestrazione esistente e definire le impostazioni della richiesta di creazione del risultato della compilazione del flusso di lavoro Dataform in formato YAML. Per ulteriori informazioni sulle impostazioni, consulta la projects.locations.repositories.compilationResultsguida di riferimento alle risorse REST.

Ad esempio, per aggiungere un'impostazione _dev schemaSuffix a tutte le azioni durante la compilazione, sostituire il corpo del passaggio createCompilationResult con il seguente snippet di codice:

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare gli argomenti di runtime in una richiesta di esecuzione.

Personalizzare la richiesta di chiamata del flusso di lavoro Dataform

Puoi aggiornare il flusso di lavoro di orchestrazione esistente e definire le impostazioni della richiesta di chiamata del flusso di lavoro Dataform nel formato YAML. Per ulteriori informazioni sulle impostazioni della richiesta di chiamata, consulta la projects.locations.repositories.workflowInvocationsguida di riferimento alle risorse REST.

Ad esempio, per eseguire solo azioni con il tag hourly con tutte le dipendenze trascendenti incluse, sostituisci il corpo createWorkflowInvocation con lo snippet di codice seguente:

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare gli argomenti di runtime in una richiesta di esecuzione.

Pianificare le esecuzioni con Cloud Composer

Puoi utilizzare Cloud Composer 2 per pianificare le esecuzioni di Dataform. Dataform non supporta Cloud Composer 1.

Per gestire le pianificazioni delle esecuzioni di Dataform con Cloud Composer 2, puoi utilizzare operatori Dataform nei grafici diretti aciclici (DAG) di Airflow. Puoi creare un DAG Airflow che pianifica le invocazioni del flusso di lavoro Dataform.

Dataform fornisce vari operatori Airflow. Sono inclusi gli operatori per ottenere un risultato di compilazione, un'invocazione di flusso di lavoro e l'annullamento di un'invocazione di flusso di lavoro. Per visualizzare l'elenco completo degli operatori Dataform Airflow disponibili, consulta Operatori Google Dataform.

Installa il pacchetto PyPi google-cloud-dataform

Se utilizzi le versioni 2.0.25 e successive di Cloud Composer 2, questo pacchetto è preinstallato nel tuo ambiente. Non è necessario installarlo.

Se utilizzi versioni precedenti di Cloud Composer 2, installa il pacchetto PyPi google-cloud-dataform.

Nella sezione dei pacchetti PyPI, specifica la versione ==0.2.0.

Crea un DAG Airflow che pianifica le invocazioni del flusso di lavoro Dataform

Per gestire le esecuzioni pianificate dei flussi di lavoro Dataform con Cloud Composer 2, scrivi il DAG utilizzando gli operatori Airflow di Dataform, quindi caricalo nel bucket del tuo ambiente.

Il seguente esempio di codice mostra un DAG Airflow che crea un risultato di compilazione Dataform e avvia un'invocazione del flusso di lavoro Dataform:

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
  • REPOSITORY_ID: il nome del repository Dataform.
  • REGION: la regione in cui si trova il repository Dataform.
  • COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro.
  • GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.

Il seguente esempio di codice mostra un DAG Airflow che esegue le seguenti operazioni:

  1. Crea un risultato di compilazione di Dataform.
  2. Avvia un'invocazione asincrona del flusso di lavoro Dataform.
  3. Effettua il polling dello stato del flusso di lavoro finché non entra nello stato previsto utilizzando DataformWorkflowInvocationStateSensor.
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
  • REPOSITORY_ID: il nome del repository Dataform.
  • REGION: la regione in cui si trova il repository Dataform.
  • COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro.
  • GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.
  • COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro.

Aggiungere i parametri di configurazione della compilazione

Puoi aggiungere altri parametri di configurazione della compilazione all'create_compilation_resultoggetto DAG di Airflow. Per ulteriori informazioni sui parametri disponibili, consulta la documentazione di riferimento dell'API CodeCompilationConfig Dataform.

  • Per aggiungere parametri di configurazione della compilazione all'oggetto DAG create_compilation_result Airflow, aggiungi i parametri selezionati al campo code_compilation_config nel seguente formato:

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
    • REPOSITORY_ID: il nome del repository Dataform.
    • REGION: la regione in cui si trova il repository Dataform.
    • GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.
    • PARAMETER: il parametro CodeCompilationConfig selezionato. Puoi aggiungere più parametri.
    • PARAMETER_VALUE: il valore del parametro selezionato.

Il seguente esempio di codice mostra il parametro defaultDatabase aggiunto all'oggetto DAG Airflow create_compilation_result:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

Aggiungi i parametri di configurazione dell'invocazione del workflow

Puoi aggiungere altri parametri di configurazione dell'invocazione del flusso di lavoro all'create_workflow_invocationoggetto DAG di Airflow. Per ulteriori informazioni sui parametri disponibili, consulta la documentazione di riferimento dell'API InvocationConfig Dataform.

  • Per aggiungere i parametri di configurazione dell'invocazione del flusso di lavoro all'create_workflow_invocationoggetto DAG Airflow, aggiungi i parametri selezionati al campo invocation_config nel seguente formato:

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
    • REPOSITORY_ID: il nome del repository Dataform.
    • REGION: la regione in cui si trova il repository Dataform.
    • PARAMETER: il parametro InvocationConfig selezionato. Puoi aggiungere più parametri.
    • PARAMETER_VALUE: il valore del parametro selezionato.

Il seguente esempio di codice mostra i parametri includedTags[] e transitiveDependenciesIncluded aggiunti all'oggetto DAG create_workflow_invocation Airflow:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

Passaggi successivi