Questo documento mostra come eseguire le seguenti operazioni in Dataform:
- Pianifica le esecuzioni con le configurazioni dei flussi di lavoro.
- Pianifica le esecuzioni con Workflows e Cloud Scheduler.
- Pianifica le esecuzioni con Cloud Composer.
Prima di iniziare
Per pianificare le esecuzioni con le configurazioni dei flussi di lavoro o pianificare le esecuzioni con i flussi di lavoro e Cloud Scheduler, procedi nel seguente modo:
Nella Google Cloud console, vai alla pagina Dataform.
Seleziona o crea un repository.
Crea una configurazione della release.
Per pianificare le esecuzioni con Cloud Composer, svolgi i seguenti passaggi:
- Seleziona o crea un repository Dataform.
- Concedi a Dataform l'accesso a BigQuery.
- Seleziona o crea uno spazio di lavoro Dataform.
- Crea almeno una tabella.
- Crea un ambiente Cloud Composer 2.
Ruoli obbligatori
Per ottenere le autorizzazioni necessarie per completare le attività descritte in questo documento, chiedi all'amministratore di concederti i seguenti ruoli IAM:
-
Amministratore Dataform (
roles/dataform.admin
) nei repository -
Worker Composer (
roles/composer.worker
) nell'account di servizio dell'ambiente Cloud Composer
Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.
Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.
Per utilizzare un account di servizio diverso da quello predefinito di Dataform, concedi l'accesso all'account di servizio personalizzato.
Per attivare le esecuzioni pianificate per una configurazione del flusso di lavoro quando è attivata la modalità di sostituzione rigorosa, devi concedere l'autorizzazione iam.serviceAccounts.actAs
all'account di servizio Dataform per l'account di servizio utilizzato nella configurazione del flusso di lavoro. Questa autorizzazione è disponibile nel ruolo Utente account di servizio (roles/iam.serviceAccountUser
).
Per utilizzare le credenziali utente dell'Account Google durante la creazione di una configurazione del flusso di lavoro (Anteprima), concedi l'accesso all'Account Google.
Pianificare le esecuzioni con le configurazioni del flusso di lavoro
Questa sezione mostra come creare una configurazione del flusso di lavoro in Dataform per pianificare e configurare le esecuzioni del flusso di lavoro. Puoi utilizzare le configurazioni del workflow per eseguire i flussi di lavoro Dataform in base a una pianificazione.
Informazioni sulle configurazioni dei flussi di lavoro
Per pianificare le esecuzioni di Dataform di tutte o di alcune azioni di workflow in BigQuery, puoi creare configurazioni di workflow. In una configurazione del workflow, seleziona una configurazione della release di compilazione, le azioni del workflow per l'esecuzione e imposta la pianificazione dell'esecuzione.
Poi, durante un'esecuzione pianificata della configurazione del flusso di lavoro, Dataform esegue il deployment della selezione di azioni dall'ultimo risultato di compilazione nella configurazione della release in BigQuery. Puoi anche attivare manualmente l'esecuzione di una configurazione del flusso di lavoro con la funzione workflowConfigs dell'API Dataform.
Una configurazione del flusso di lavoro Dataform contiene le seguenti impostazioni di esecuzione:
- ID della configurazione del flusso di lavoro.
- Configurazione della release.
Account di servizio.
Si tratta dell'account di servizio associato alla configurazione del flusso di lavoro. Puoi selezionare l'account di servizio Dataform predefinito o un account di servizio associato al tuo Google Cloud progetto oppure inserire manualmente un altro account di servizio. Per impostazione predefinita, le configurazioni dei flussi di lavoro utilizzano gli stessi account di servizio dei relativi repositories.
Le credenziali dell'account di servizio sono il metodo di autorizzazione predefinito per la creazione e le esecuzioni delle configurazioni dei workflow pianificati.
Credenziali utente dell'Account Google (anteprima)
Le credenziali utente dell'Account Google sono il metodo di autorizzazione predefinito per la creazione ed esecuzione di configurazioni di flussi di lavoro manuali e non pianificati. Per maggiori informazioni, vedi Autorizzare l'Account Google.
Azioni del flusso di lavoro da eseguire:
- Tutte le azioni.
- Selezione di azioni.
- Selezione di tag.
Pianificazione delle esecuzioni e fuso orario.
Crea una configurazione di flusso di lavoro
Per creare una configurazione del flusso di lavoro di Dataform:
- Nel repository, vai a Uscite e pianificazione.
- Nella sezione Configurazioni dei flussi di lavoro, fai clic su Crea.
Nel riquadro Crea configurazione del flusso di lavoro, inserisci un ID univoco per la configurazione del flusso di lavoro nel campo ID configurazione.
Gli ID possono includere solo numeri, lettere, trattini e trattini bassi.
Nel menu Configurazione della release, seleziona una configurazione della release della compilazione.
Nella sezione Autenticazione, autorizza la configurazione del flusso di lavoro con le credenziali utente del tuo Account Google o con un account di servizio.
- Per utilizzare le credenziali utente del tuo Account Google (Anteprima), seleziona Esegui con le mie credenziali utente.
- Per utilizzare un account di servizio, seleziona Esegui con l'account di servizio selezionato, quindi seleziona l'account di servizio Dataform predefinito o qualsiasi account di servizio associato al tuo progettoGoogle Cloud a cui hai accesso. Se non selezioni un account di servizio, la configurazione del flusso di lavoro utilizza l'account di servizio del repository.
(Facoltativo) Nel campo Frequenza pianificazione, inserisci la frequenza delle esecuzioni nel formato cron di Unix.
Per verificare che Dataform esegua l'ultimo risultato di compilazione nella configurazione della release corrispondente, mantieni un'interruzione minima di un'ora tra l'ora di creazione del risultato di compilazione e l'ora di esecuzione pianificata.
(Facoltativo) Nel menu Fuso orario, seleziona il fuso orario per le esecuzioni.
Il fuso orario predefinito è UTC.
Seleziona le azioni del flusso di lavoro da eseguire:
- Per eseguire l'intero flusso di lavoro, fai clic su Tutte le azioni.
- Per eseguire le azioni selezionate nel flusso di lavoro, fai clic su Selezione di azioni e poi seleziona le azioni.
- Per eseguire azioni con i tag selezionati, fai clic su Selezione di tag e seleziona i tag.
- (Facoltativo) Per eseguire le azioni o i tag selezionati e le relative dipendenze, seleziona l'opzione Includi dipendenze.
- (Facoltativo) Per eseguire le azioni o i tag selezionati e i relativi elementi dipendenti, seleziona l'opzione Includi elementi dipendenti.
- (Facoltativo) Per ricostruire tutte le tabelle da zero, seleziona l'opzione Esegui con aggiornamento completo.
Senza questa opzione, Dataform aggiorna le tabelle incrementali senza ricostruirle da zero.
Fai clic su Crea. Se hai selezionato Esegui con le mie credenziali utente come metodo di autenticazione, devi autorizzare il tuo Account Google (anteprima).
Ad esempio, la seguente configurazione del flusso di lavoro esegue azioni con il tag hourly
ogni ora nel fuso orario CEST:
- ID configurazione:
production-hourly
- Configurazione della release: -
- Frequenza:
0 * * * *
- Fuso orario:
Central European Summer Time (CEST)
- Selezione di azioni di flusso di lavoro: selezione di tag, tag
hourly
Autorizzare l'Account Google
Per autenticare la risorsa con le credenziali utente del tuo Account Google, devi concedere manualmente l'autorizzazione alle pipeline BigQuery per recuperare il token di accesso per il tuo Account Google e accedere ai dati di origine per tuo conto. Puoi concedere l'approvazione manuale con l'interfaccia della finestra di dialogo OAuth.
Devi concedere l'autorizzazione alle pipeline BigQuery una sola volta.
Per revocare l'autorizzazione che hai concesso:
- Vai alla pagina del tuo Account Google.
- Fai clic su Pipeline BigQuery.
- Fai clic su Rimuovi accesso.
La modifica del proprietario della configurazione del workflow aggiornando le credenziali richiede anche l'approvazione manuale se il nuovo proprietario dell'Account Google non ha mai creato una configurazione del workflow.
Modificare una configurazione del flusso di lavoro
Per modificare una configurazione del flusso di lavoro:
- Nel repository, vai a Uscite e pianificazione.
- Accanto alla configurazione del flusso di lavoro che vuoi modificare, fai clic sul menu Altro e poi su Modifica.
- Nel riquadro Modifica configurazione del flusso di lavoro, modifica le impostazioni della configurazione della release e poi fai clic su Salva.
Eliminare una configurazione del flusso di lavoro
Per eliminare una configurazione del flusso di lavoro:
- Nel repository, vai a Uscite e pianificazione.
- Accanto alla configurazione del flusso di lavoro che vuoi eliminare, fai clic sul menu Altro e poi su Elimina.
- Nella finestra di dialogo Elimina configurazione della release, fai clic su Elimina.
Pianificare le esecuzioni con Workflows e Cloud Scheduler
Questa sezione mostra come pianificare le esecuzioni dei flussi di lavoro Dataform utilizzando Workflows e Cloud Scheduler.
Informazioni sulle esecuzioni pianificate del flusso di lavoro
Puoi impostare la frequenza di esecuzione dei flussi di lavoro di Dataform creando un job Cloud Scheduler che attivi un flusso di lavoro Workflows. Workflows esegue i servizi in un flusso di lavoro di orchestrazione definito da te.
Workflows esegue il flusso di lavoro Dataform in un procedura in due fasi. Innanzitutto, estrae il codice del repository Dataform dal provider Git e lo compila in un risultato di compilazione. Poi utilizza il risultato della compilazione per creare un flusso di lavoro Dataform e lo esegue con la frequenza impostata.
Creare un flusso di lavoro di orchestrazione pianificato
Per pianificare le esecuzioni del flusso di lavoro Dataform, utilizza Workflows per creare un flusso di lavoro di orchestrazione e aggiungi un job Cloud Scheduler come attivatore.
Workflows utilizza i service account per concedere ai flussi di lavoro l'accesso alle Google Cloud risorse. Crea un account di servizio e concedigli il ruolo di Identity and Access Management Editor di Dataform (
roles/dataform.editor
) nonché le autorizzazioni minime richieste per gestire il flusso di lavoro di orchestrazione. Per ulteriori informazioni, vedi Concedere l'autorizzazione dei workflow per l'accesso alle Google Cloud risorse.Crea un flusso di lavoro di orchestrazione e utilizza il seguente codice sorgente YAML come definizione del flusso di lavoro:
main: steps: - init: assign: - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID - createCompilationResult: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"} auth: type: OAuth2 body: gitCommitish: GIT_COMMITISH result: compilationResult - createWorkflowInvocation: call: http.post args: url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"} auth: type: OAuth2 body: compilationResult: ${compilationResult.body.name} result: workflowInvocation - complete: return: ${workflowInvocation.body.name}
Sostituisci quanto segue:
- PROJECT_ID: l'ID del tuo Google Cloud progetto.
- REPOSITORY_LOCATION: la posizione del repository Dataform.
- REPOSITORY_ID: il nome del repository Dataform.
- GIT_COMMITISH: il ramo Git da cui vuoi eseguire il codice Dataform. Per un repository appena creato, sostituisci con
main
.
Pianifica il flusso di lavoro di orchestrazione utilizzando Cloud Scheduler.
Personalizzare la richiesta di risultato della compilazione della creazione del flusso di lavoro Dataform
Puoi
aggiornare il flusso di lavoro di orchestrazione esistente
e definire le impostazioni della richiesta di creazione del risultato della compilazione del flusso di lavoro Dataform in formato YAML. Per ulteriori informazioni sulle impostazioni, consulta la projects.locations.repositories.compilationResults
guida di riferimento alle risorse REST.
Ad esempio, per aggiungere un'impostazione _dev
schemaSuffix
a tutte le azioni durante la compilazione,
sostituire il corpo del passaggio createCompilationResult
con il seguente snippet di codice:
- createCompilationResult:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
auth:
type: OAuth2
body:
gitCommitish: GIT_COMMITISH
codeCompilationConfig:
schemaSuffix: dev
Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare gli argomenti di runtime in una richiesta di esecuzione.
Personalizzare la richiesta di chiamata del flusso di lavoro Dataform
Puoi
aggiornare il flusso di lavoro di orchestrazione esistente
e definire le impostazioni della richiesta di chiamata del flusso di lavoro Dataform nel
formato YAML. Per ulteriori informazioni sulle impostazioni della richiesta di chiamata, consulta la projects.locations.repositories.workflowInvocations
guida di riferimento alle risorse REST.
Ad esempio, per eseguire solo azioni con il tag hourly
con tutte le dipendenze trascendenti incluse, sostituisci il corpo createWorkflowInvocation
con lo snippet di codice seguente:
- createWorkflowInvocation:
call: http.post
args:
url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: ${compilationResult.body.name}
invocationConfig:
includedTags:
- hourly
transitiveDependenciesIncluded: true
Puoi anche passare impostazioni aggiuntive come argomenti di runtime in una richiesta di esecuzione di Workflows e accedere a questi argomenti utilizzando le variabili. Per ulteriori informazioni, consulta Passare gli argomenti di runtime in una richiesta di esecuzione.
Pianificare le esecuzioni con Cloud Composer
Puoi utilizzare Cloud Composer 2 per pianificare le esecuzioni di Dataform. Dataform non supporta Cloud Composer 1.
Per gestire le pianificazioni delle esecuzioni di Dataform con Cloud Composer 2, puoi utilizzare operatori Dataform nei grafici diretti aciclici (DAG) di Airflow. Puoi creare un DAG Airflow che pianifica le invocazioni del flusso di lavoro Dataform.
Dataform fornisce vari operatori Airflow. Sono inclusi gli operatori per ottenere un risultato di compilazione, un'invocazione di flusso di lavoro e l'annullamento di un'invocazione di flusso di lavoro. Per visualizzare l'elenco completo degli operatori Dataform Airflow disponibili, consulta Operatori Google Dataform.
Installa il pacchetto PyPi google-cloud-dataform
Se utilizzi le versioni 2.0.25
e successive di Cloud Composer 2, questo pacchetto è preinstallato nel tuo ambiente. Non è necessario installarlo.
Se utilizzi versioni precedenti di Cloud Composer 2,
installa il pacchetto PyPi google-cloud-dataform
.
Nella sezione dei pacchetti PyPI, specifica la versione ==0.2.0
.
Crea un DAG Airflow che pianifica le invocazioni del flusso di lavoro Dataform
Per gestire le esecuzioni pianificate dei flussi di lavoro Dataform con Cloud Composer 2, scrivi il DAG utilizzando gli operatori Airflow di Dataform, quindi caricalo nel bucket del tuo ambiente.
Il seguente esempio di codice mostra un DAG Airflow che crea un risultato di compilazione Dataform e avvia un'invocazione del flusso di lavoro Dataform:
from datetime import datetime
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
},
)
create_compilation_result >> create_workflow_invocation
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro.
- GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.
Il seguente esempio di codice mostra un DAG Airflow che esegue le seguenti operazioni:
- Crea un risultato di compilazione di Dataform.
- Avvia un'invocazione asincrona del flusso di lavoro Dataform.
- Effettua il polling dello stato del flusso di lavoro finché non entra nello stato previsto utilizzando
DataformWorkflowInvocationStateSensor
.
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor
DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2022, 1, 1),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
asynchronous=True,
workflow_invocation={
"compilation_result": COMPILATION_RESULT
}
)
is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
task_id="is_workflow_invocation_done",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)
create_compilation_result >> create_workflow_invocation
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro.
- GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.
- COMPILATION_RESULT: il nome del risultato della compilazione da utilizzare per questa chiamata del flusso di lavoro.
Aggiungere i parametri di configurazione della compilazione
Puoi aggiungere altri parametri di configurazione della compilazione all'create_compilation_result
oggetto DAG di Airflow. Per ulteriori informazioni sui parametri disponibili, consulta la documentazione di riferimento dell'API CodeCompilationConfig
Dataform.
Per aggiungere parametri di configurazione della compilazione all'oggetto DAG
create_compilation_result
Airflow, aggiungi i parametri selezionati al campocode_compilation_config
nel seguente formato:create_compilation_result = DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"} }, )
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- GIT_COMMITISH: il commitish Git nel repository Git remoto della versione del codice che vuoi utilizzare, ad esempio un ramo o un SHA Git.
- PARAMETER: il parametro
CodeCompilationConfig
selezionato. Puoi aggiungere più parametri. - PARAMETER_VALUE: il valore del parametro selezionato.
Il seguente esempio di codice mostra il parametro defaultDatabase
aggiunto all'oggetto DAG Airflow create_compilation_result
:
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": REMOTE_BRANCH,
"code_compilation_config": { "default_database": "my-custom-gcp-project"}
},
)
Aggiungi i parametri di configurazione dell'invocazione del workflow
Puoi aggiungere altri parametri di configurazione dell'invocazione del flusso di lavoro all'create_workflow_invocation
oggetto DAG di Airflow. Per ulteriori informazioni sui parametri disponibili, consulta la documentazione di riferimento dell'API InvocationConfig
Dataform.
Per aggiungere i parametri di configurazione dell'invocazione del flusso di lavoro all'
create_workflow_invocation
oggetto DAG Airflow, aggiungi i parametri selezionati al campoinvocation_config
nel seguente formato:create_workflow_invocation = DataformCreateWorkflowInvocationOperator( task_id='create_workflow_invocation', project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, workflow_invocation={ "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}", "invocation_config": { "PARAMETER": PARAMETER_VALUE } }, )
Sostituisci quanto segue:
- PROJECT_ID: il tuo ID progetto Google Cloud Dataform.
- REPOSITORY_ID: il nome del repository Dataform.
- REGION: la regione in cui si trova il repository Dataform.
- PARAMETER: il parametro
InvocationConfig
selezionato. Puoi aggiungere più parametri. - PARAMETER_VALUE: il valore del parametro selezionato.
Il seguente esempio di codice mostra i parametri includedTags[]
e
transitiveDependenciesIncluded
aggiunti all'oggetto DAG create_workflow_invocation
Airflow:
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
"invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
},
)
Passaggi successivi
- Per scoprire come configurare le configurazioni della release di compilazione di Dataform, consulta Creare una configurazione della release.
- Per scoprire di più sul ciclo di vita del codice Dataform, consulta Introduzione al ciclo di vita del codice in Dataform.
- Per scoprire di più sull'API Dataform, consulta API Dataform.
- Per scoprire di più sugli ambienti Cloud Composer, consulta la Panoramica di Cloud Composer.
- Per scoprire di più sui prezzi di Workflows, consulta la pagina relativa ai prezzi di Workflows.