排定執行作業

本文件說明如何在 Dataform 中執行下列操作:

事前準備

如要使用工作流程設定排定執行作業,或使用工作流程和 Cloud Scheduler 排定執行作業,請執行下列操作:

  1. 在 Google Cloud 控制台中,前往「Dataform」頁面。

    前往 Dataform

  2. 選取或建立存放區

  3. 建立版本設定

如要使用 Cloud Composer 排定執行作業,請按照下列步驟操作:

  1. 選取或建立 Dataform 存放區
  2. 將 BigQuery 存取權授予 Dataform
  3. 選取或建立 Dataform 工作區
  4. 至少建立一個資料表
  5. 建立 Cloud Composer 2 環境

必要的角色

如要取得完成本文件中任務所需的權限,請管理員授予您下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

如要使用預設 Dataform 服務帳戶以外的服務帳戶,請授予自訂服務帳戶存取權

如要在啟用嚴格 act-as 模式時,為工作流程設定啟用排定執行作業,您必須為工作流程設定中使用的服務帳戶,將 iam.serviceAccounts.actAs 權限授予Dataform 服務帳戶服務帳戶使用者角色 (roles/iam.serviceAccountUser) 具有此權限。

如要在建立工作流程設定 (預先發布版) 時使用 Google 帳戶使用者憑證,請授予 Google 帳戶存取權

使用工作流程設定排定執行作業

本節將說明如何在 Dataform 中建立工作流程設定,以便排定及設定工作流程執行作業。您可以運用工作流程設定,讓系統按照排程執行 Dataform 工作流程。

關於工作流程設定

如要排定 Dataform 執行 BigQuery 中的所有或所選工作流程動作,您可以建立工作流程設定。在工作流程設定中,您可以選取編譯版本設定、選取要執行的工作流程動作,以及設定執行時程。

接著,在排定執行工作流程設定期間,Dataform 會將您在發布設定中選取的動作,從最新的編譯結果部署至 BigQuery。您也可以使用 Dataform API workflowConfigs,手動觸發工作流程設定的執行作業。

Dataform 工作流程設定包含下列執行設定:

  • 工作流程設定的 ID。
  • 版本設定。
  • 服務帳戶。

    這是與工作流程設定相關聯的服務帳戶。您可以選取預設的 Dataform 服務帳戶,或與 Google Cloud 專案相關聯的服務帳戶,也可以手動輸入其他服務帳戶。根據預設,工作流程設定會使用與repositories相同的服務帳戶。

    服務帳戶憑證是排定工作流程設定建立和執行作業的預設授權方法。

  • Google 帳戶使用者憑證 (預先發布版)

    如要手動建立及執行未排程的工作流程設定,預設授權方法為 Google 帳戶使用者憑證。詳情請參閱「授權給您的 Google 帳戶」。

  • 要執行的工作流程動作:

    • 所有動作。
    • 選取動作。
    • 選取代碼
  • 執行時間表和時區。

建立工作流程設定

如要建立 Dataform 工作流程設定,請按照下列步驟操作:

  1. 在儲存庫中,前往「發布內容與排程」
  2. 在「工作流程設定」部分中,按一下「建立」
  3. 在「Create workflow configuration」(建立工作流程設定) 窗格的「Configuration ID」(設定 ID) 欄位中,輸入工作流程設定的專屬 ID。

    ID 只能包含數字、英文字母、連字號和底線。

  4. 在「Release configuration」選單中,選取編譯版本設定

  5. 在「Authentication」部分,使用 Google 帳戶使用者憑證或服務帳戶授權工作流程設定。

    • 如要使用 Google 帳戶使用者憑證 (預覽),請選取「以我的使用者憑證執行」
    • 如要使用服務帳戶,請選取「Execute with selected service account」(使用所選服務帳戶執行),然後選取預設 Dataform 服務帳戶,或與您有存取權的Google Cloud 專案相關聯的任何服務帳戶。如果您未選取服務帳戶,工作流程設定會使用存放區的服務帳戶。
  6. 選用:在「Schedule frequency」欄位中,使用 unix-cron 格式輸入執行頻率。

    如要確認 Dataform 在對應的發布設定中執行最新的編譯結果,請在編譯結果建立時間和排定執行時間之間保留至少一小時的間隔。

  7. 選用:在「時區」選單中,選取執行作業的時區。

    預設時區為世界標準時間。

  8. 選取要執行的工作流程動作:

    • 如要執行整個工作流程,請按一下「所有動作」
    • 如要執行工作流程中所選動作,請按一下「動作選項」,然後選取動作。
    • 如要執行選取的代碼動作,請按一下「選取代碼」,然後選取代碼。
    • 選用步驟:如要執行所選動作或代碼及其依附元件,請選取「Include dependencies」選項。
    • 選用步驟:如要執行所選動作或代碼及其依附元件,請選取「Include dependents」選項。
    • 選用:如要從頭重新建構所有資料表,請選取「Run with full refresh」選項。

    如果沒有這個選項,Dataform 會更新增量資料表,但不會從頭重新建構。

  9. 按一下「建立」,如果您選取「使用我的使用者憑證執行」做為驗證方法,則必須授權 Google 帳戶 (預先發布版)。

舉例來說,下列工作流程設定會在 CEST 時區每小時執行帶有 hourly 標記的動作:

  • 設定 IDproduction-hourly
  • 版本設定:-
  • 頻率0 * * * *
  • 時區Central European Summer Time (CEST)
  • 選取工作流程動作:選取標記、hourly 標記

授權給您的 Google 帳戶

如要使用 Google 帳戶使用者憑證驗證資源,您必須手動授予 BigQuery 管道權限,以便取得 Google 帳戶的存取金鑰,並代您存取來源資料。您可以透過 OAuth 對話方塊介面手動核准。

您只需要一次授予 BigQuery 管道權限。

如要撤銷已授予的權限,請按照下列步驟操作:

  1. 前往 Google 帳戶頁面
  2. 按一下「BigQuery 管道」
  3. 按一下 [移除存取權]

如果新 Google 帳戶擁有者從未建立工作流程設定,則必須透過更新憑證來變更工作流程設定擁有者,也需要手動核准。

編輯工作流程設定

如要編輯工作流程設定,請按照下列步驟操作:

  1. 在儲存庫中,前往「發布內容與排程」
  2. 在要編輯的工作流程設定中,按一下 「More」選單,然後點選「Edit」
  3. 在「Edit workflow configuration」窗格中編輯版本設定,然後按一下「Save」

刪除工作流程設定

如要刪除工作流程設定,請按照下列步驟操作:

  1. 在儲存庫中,前往「發布內容與排程」
  2. 在要刪除的工作流程設定中,按一下 「More」(更多) 選單,然後點選「Delete」(刪除)
  3. 在「Delete release configuration」對話方塊中,按一下「Delete」

使用 Workflows 和 Cloud Scheduler 排定執行作業

本節將說明如何使用工作流程和 Cloud Scheduler 排定 Dataform 工作流程的執行時間。

關於排定的工作流程執行作業

您可以建立可觸發 Workflows 工作流程的 Cloud Scheduler 工作,藉此設定 Dataform 工作流程執行的頻率。Workflows 會在您定義的自動化調度管理工作流程中執行服務。

Workflows 會以兩個步驟的程序執行 Dataform 工作流程。首先,它會從 Git 供應器中提取 Dataform 存放區程式碼,並將其編譯為編譯結果。接著,系統會使用編譯結果建立 Dataform 工作流程,並依您設定的頻率執行。

建立已排定時間的自動化調度管理工作流程

如要排定 Dataform 工作流程的執行時間,請使用 Workflows 建立調度工作流程,並新增 Cloud Scheduler 工作做為觸發事件。

  1. Workflows 會使用服務帳戶,讓 workflow 存取Google Cloud 資源。建立服務帳戶,並授予該帳戶 Dataform 編輯器 (roles/dataform.editor) Identity and Access Management 角色,以及管理調度工作流程所需的最低權限。詳情請參閱「授予工作流程權限,以便存取 Google Cloud 資源」。

  2. 建立自動化調度管理工作流程,並使用下列 YAML 原始碼做為工作流程定義:

    main:
        steps:
        - init:
            assign:
            - repository: projects/PROJECT_ID/locations/REPOSITORY_LOCATION/repositories/REPOSITORY_ID
        - createCompilationResult:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
                auth:
                    type: OAuth2
                body:
                    gitCommitish: GIT_COMMITISH
            result: compilationResult
        - createWorkflowInvocation:
            call: http.post
            args:
                url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
                auth:
                    type: OAuth2
                body:
                    compilationResult: ${compilationResult.body.name}
            result: workflowInvocation
        - complete:
            return: ${workflowInvocation.body.name}
    

    更改下列內容:

    • PROJECT_ID: Google Cloud 專案的 ID。
    • REPOSITORY_LOCATION:Dataform 存放區的位置。
    • REPOSITORY_ID:Dataform 存放區名稱。
    • GIT_COMMITISH:您要執行 Dataform 程式碼的 Git 分支。如果是新建立的存放區,請改用 main
  3. 使用 Cloud Scheduler 排定自動化調度管理工作流程

自訂 Dataform 工作流程建立編譯結果要求

您可以更新現有的調度工作流程,並以 YAML 格式定義 Dataform 工作流程建立編譯結果要求設定。如要進一步瞭解這些設定,請參閱 projects.locations.repositories.compilationResults REST 資源參考資料

舉例來說,如要在編譯期間將 _dev schemaSuffix 設定新增至所有動作,請將 createCompilationResult 步驟主體替換為以下程式碼片段:

    - createCompilationResult:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/compilationResults"}
            auth:
                type: OAuth2
            body:
                gitCommitish: GIT_COMMITISH
                codeCompilationConfig:
                    schemaSuffix: dev

您也可以在工作流程執行要求中,將其他設定做為執行階段引數傳遞,並使用變數存取這些引數。詳情請參閱「在執行要求中傳遞執行階段引數」。

自訂 Dataform 工作流程叫用要求

您可以更新現有的調度工作流程,並以 YAML 格式定義 Dataform 工作流程叫用要求設定。如要進一步瞭解叫用要求設定,請參閱 projects.locations.repositories.workflowInvocations REST 資源參考資料

舉例來說,如要只執行含有所有傳遞依附元件的 hourly 標記動作,請將 createWorkflowInvocation 主體替換為下列程式碼片段:

    - createWorkflowInvocation:
        call: http.post
        args:
            url: ${"https://dataform.googleapis.com/v1beta1/" + repository + "/workflowInvocations"}
            auth:
                type: OAuth2
            body:
                compilationResult: ${compilationResult.body.name}
                invocationConfig:
                    includedTags:
                    - hourly
                    transitiveDependenciesIncluded: true
                

您也可以在工作流程執行要求中,將其他設定做為執行階段引數傳遞,並使用變數存取這些引數。詳情請參閱「在執行要求中傳遞執行階段引數」。

使用 Cloud Composer 排定執行作業

您可以使用 Cloud Composer 2 安排 Dataform 執行作業。Dataform 不支援 Cloud Composer 1

如要使用 Cloud Composer 2 管理 Dataform 執行作業的時間表,您可以在 Airflow 有向非循環圖 (DAG) 中使用 Dataform 運算子。您可以建立 Airflow DAG,以便排定 Dataform 工作流程的叫用作業。

Dataform 提供各種 Airflow 運算子。包括用於取得編譯結果、取得工作流程叫用作業,以及取消工作流程叫用作業的運算子。如要查看可用的 Dataform Airflow 運算子的完整清單,請參閱「Google Dataform 運算子」。

安裝 google-cloud-dataform PyPi 套件

如果您使用 Cloud Composer 2 版本 2.0.25 以上版本,這個套件會預先安裝在您的環境中。您不必安裝。

如果您使用的是較舊的 Cloud Composer 2 版本,請安裝 google-cloud-dataform PyPi 套件

在 PyPI 套件專區中,指定 ==0.2.0 版本。

建立排定 Dataform 工作流程叫用作業的 Airflow DAG

如要使用 Cloud Composer 2 管理 Dataform 工作流程的排程執行作業,請使用 Dataform Airflow 運算子編寫 DAG,然後將其上傳至環境的儲存桶

以下程式碼範例顯示 Airflow DAG,可建立 Dataform 編譯結果,並啟動 Dataform 工作流程叫用:

from datetime import datetime

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )
    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )


create_compilation_result >> create_workflow_invocation

更改下列內容:

  • PROJECT_ID:您的 Dataform Google Cloud 專案 ID。
  • REPOSITORY_ID:Dataform 存放區名稱。
  • REGION:Dataform 存放區所在的區域。
  • COMPILATION_RESULT:您要用於這個工作流程叫用作業的編譯結果名稱。
  • GIT_COMMITISH:您要使用的程式碼版本的遠端 Git 存放區中的 Git 修訂版本,例如分支或 Git SHA。

以下程式碼範例顯示 Airflow DAG,可執行下列操作:

  1. 建立 Dataform 編譯結果。
  2. 啟動非同步 Dataform 工作流程叫用作業。
  3. 使用 DataformWorkflowInvocationStateSensor 輪詢工作流程狀態,直到工作流程進入預期狀態為止。
from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)
from airflow.providers.google.cloud.sensors.dataform import DataformWorkflowInvocationStateSensor

DAG_ID = "dataform"
PROJECT_ID = "PROJECT_ID"
REPOSITORY_ID = "REPOSITORY_ID"
REGION = "REGION"
GIT_COMMITISH = "GIT_COMMITISH"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2022, 1, 1),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
    task_id='create_workflow_invocation',
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    asynchronous=True,
    workflow_invocation={
        "compilation_result": COMPILATION_RESULT
    }
)

is_workflow_invocation_done = DataformWorkflowInvocationStateSensor(
    task_id="is_workflow_invocation_done",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    workflow_invocation_id=("{{ task_instance.xcom_pull('create_workflow_invocation')['name'].split('/')[-1] }}"),
    expected_statuses={WorkflowInvocation.State.SUCCEEDED},
)


create_compilation_result >> create_workflow_invocation

更改下列內容:

  • PROJECT_ID:您的 Dataform Google Cloud 專案 ID。
  • REPOSITORY_ID:Dataform 存放區名稱。
  • REGION:Dataform 存放區所在的區域。
  • COMPILATION_RESULT:您要用於這個工作流程叫用作業的編譯結果名稱。
  • GIT_COMMITISH:您要使用的程式碼版本的遠端 Git 存放區中的 Git 修訂版本,例如分支或 Git SHA。
  • COMPILATION_RESULT:您要用於這個工作流程叫用作業的編譯結果名稱。

新增編譯設定參數

您可以將其他編譯設定參數新增至 create_compilation_result Airflow DAG 物件。如要進一步瞭解可用的參數,請參閱 CodeCompilationConfig Dataform API 參考資料

  • 如要將編譯設定參數新增至 create_compilation_result Airflow DAG 物件,請按照下列格式將所選參數新增至 code_compilation_config 欄位:

        create_compilation_result = DataformCreateCompilationResultOperator(
            task_id="create_compilation_result",
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            compilation_result={
                "git_commitish": GIT_COMMITISH,
                "code_compilation_config": { "PARAMETER": "PARAMETER_VALUE"}
            },
        )
    

    更改下列內容:

    • PROJECT_ID:您的 Dataform Google Cloud 專案 ID。
    • REPOSITORY_ID:Dataform 存放區名稱。
    • REGION:Dataform 存放區所在的區域。
    • GIT_COMMITISH:您要使用的程式碼版本的遠端 Git 存放區中的 Git 修訂版本,例如分支或 Git SHA。
    • PARAMETER:所選 CodeCompilationConfig 參數。您可以新增多個參數。
    • PARAMETER_VALUE:所選參數的值。

以下程式碼範例顯示已新增至 create_compilation_result Airflow DAG 物件的 defaultDatabase 參數:

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": REMOTE_BRANCH,
            "code_compilation_config": { "default_database": "my-custom-gcp-project"}
        },
    )

新增工作流程叫用設定參數

您可以將其他工作流程叫用設定參數新增至 create_workflow_invocation Airflow DAG 物件。如要進一步瞭解可用的參數,請參閱 InvocationConfig Dataform API 參考資料

  • 如要將工作流程叫用設定參數新增至 create_workflow_invocation Airflow DAG 物件,請按照下列格式將所選參數新增至 invocation_config 欄位:

        create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
            task_id='create_workflow_invocation',
            project_id=PROJECT_ID,
            region=REGION,
            repository_id=REPOSITORY_ID,
            workflow_invocation={
                "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
                "invocation_config": { "PARAMETER": PARAMETER_VALUE }
            },
        )
    
    

    更改下列內容:

    • PROJECT_ID:您的 Dataform Google Cloud 專案 ID。
    • REPOSITORY_ID:Dataform 存放區名稱。
    • REGION:Dataform 存放區所在的區域。
    • PARAMETER:所選 InvocationConfig 參數。您可以新增多個參數。
    • PARAMETER_VALUE:所選參數的值。

以下程式碼範例顯示已新增至 create_workflow_invocation Airflow DAG 物件的 includedTags[]transitiveDependenciesIncluded 參數:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}",
            "invocation_config": { "included_tags": ["daily"], "transitive_dependencies_included": true }
        },
    )

後續步驟