協調管道

本頁面說明如何使用 Cloud Composer 和觸發事件進行管道自動化調度管理。Cloud Data Fusion 建議您使用 Cloud Composer 來調度管道。如果您需要更簡單的自動化調度管理方式,請使用觸發條件。

Composer

使用 Cloud Composer 自動化調度管理管道

透過 Cloud Composer 在 Cloud Data Fusion 中自動化調度管理管道執行作業,可享有下列優點:

  • 集中管理工作流程:統一管理多個 Cloud Data Fusion 管道的執行作業。
  • 依附元件管理:為確保正確的執行順序,請定義管道之間的依附元件。
  • 監控和快訊:Cloud Composer 提供監控功能,並在發生錯誤時發出快訊。
  • 與其他服務整合:Cloud Composer 可讓您自動化調度管理跨 Cloud Data Fusion 和其他Google Cloud 服務的工作流程。

如要使用 Cloud Composer 自動化調度管理 Cloud Data Fusion 管道,請按照下列程序操作:

  1. 設定 Cloud Composer 環境。

    • 建立 Cloud Composer 環境。如果沒有,請在 Google Cloud 專案中佈建環境。這個環境就是自動化調度管理工作區。
    • 授予權限。請確認 Cloud Composer 服務帳戶具備存取 Cloud Data Fusion 的必要權限 (例如啟動、停止及列出管道的權限)。
  2. 定義自動化調度管理的無向有向圖 (DAG)。

    • 建立 DAG:在 Cloud Composer 中建立 DAG,定義 Cloud Data Fusion 管道的自動化調度管理工作流程。
    • Cloud Data Fusion 運算子:在 DAG 中使用 Cloud Composer 的 Cloud Data Fusion 運算子。這些運算子可讓您以程式輔助方式與 Cloud Data Fusion 互動。

Cloud Data Fusion 運算子

Cloud Data Fusion 管道自動化調度管理功能提供下列運算子:

CloudDataFusionStartPipelineOperator

根據 Cloud Data Fusion 管道的 ID 觸發執行作業。這個方法包含下列參數:

  • pipeline ID
  • 位置 (Google Cloud region)
  • Pipeline 命名空間
  • 執行階段引數 (選填)
  • 等待完成 (選用)
  • 逾時 (選填)
CloudDataFusionStopPipelineOperator

可讓您停止執行中的 Cloud Data Fusion 管道。

CloudDataFusionDeletePipelineOperator

刪除 Cloud Data Fusion 管道。

建構 DAG 工作流程

建構 DAG 工作流程時,請考量以下事項:

  • 定義依附元件:使用 DAG 結構定義工作之間的依附元件。舉例來說,您可以建立工作,等待某個命名空間中的管道順利完成,再觸發另一個命名空間中的管道。
  • 排程:排定 DAG 在特定時間間隔執行 (例如每天或每小時),或將其設為手動觸發。

詳情請參閱 Cloud Composer 總覽

觸發條件

使用觸發條件自動調度管道

在完成一或多個上游管道 (成功、失敗或任何指定條件) 後,Cloud Data Fusion 觸發條件可讓您自動執行下游管道。

觸發條件在下列工作中相當實用:

  • 清理資料一次,然後提供給多個下游管道使用。
  • 在管道之間共用資訊,例如執行階段引數和外掛程式設定。這項工作稱為酬載設定
  • 使用一組動態管道,以每小時、每天、每週或每月為單位執行資料,而非使用靜態管道,因為每次執行時都必須更新。

舉例來說,您有一個資料集,其中包含公司出貨的所有資訊。您想根據這些資料回答幾個業務問題。為此,您需要建立一個管道來清理出貨相關的原始資料,這個管道稱為「Shipments Data Cleaning」。接著,您可以建立第二個管道「Delayed Shipments USA」,讀取已清除的資料,並找出在美國境內延遲超過指定門檻的貨件。上游Shipments Data Cleaning 管道成功完成後,即可觸發 Delayed Shipments USA 管道。

此外,由於下游管道會使用上游管道的輸出內容,因此您必須指定下游管道在使用此觸發條件執行時,也要接收要讀取的輸入目錄 (也就是上游管道產生輸出內容的目錄)。這個程序稱為「傳遞酬載設定」,您可以使用執行階段引數定義這項程序。這項功能可讓您建立一組動態管道,以小時、日、週或月資料執行 (而非靜態管道,因為每次執行時都必須更新)。

如要透過觸發條件協調管道,請按照下列程序操作:

  1. 建立上游和下游管道。

    • 在 Cloud Data Fusion Studio 中,設計並部署組成調度鏈結的管道。
    • 請考量哪個管道的完成作業會在工作流程中啟用下一個管道 (下游)。
  2. 選用:為上游管道傳遞執行階段引數。

  3. 在下游管道中建立 inbound 觸發條件。

    • 在 Cloud Data Fusion Studio 中,前往「List」頁面。在「已部署」分頁中,按一下下游管道名稱。該管道的「部署」檢視畫面隨即顯示。
    • 按一下頁面左側中間的「Inbound triggers」(輸入觸發事件)。畫面上會顯示可用的管道清單。
    • 按一下上游管道。選取一或多個上游管道完成狀態 (「成功」、「失敗」或「停止」),做為下游管道應執行時的條件。
    • 如果您希望上游管道與下游管道共用資訊 (稱為酬載設定),請按一下「觸發設定」,然後按照步驟將酬載設定做為執行階段引數傳遞。否則請按一下「啟用觸發條件」
  4. 測試觸發條件。

    • 啟動上游管道的執行作業。
    • 如果觸發事件設定正確,則會根據所設定的條件,在上游管道完成後自動執行下游管道。

將酬載設定傳遞為執行階段引數

酬載設定可讓您將上游管道中的資訊分享給下游管道。這類資訊包括輸出目錄、資料格式或管道執行日期。隨後,下游管道會使用這項資訊做出決策,例如決定要讀取哪個正確的資料集。

如要將資訊從上游管道傳遞至下游管道,請使用上游管道中執行階段引數或任何外掛程式的設定值,設定下游管道的執行階段引數。

每當下游管道觸發並執行時,其酬載設定都會使用觸發下游管道的上游管道特定執行作業的執行階段引數來設定。

如要將酬載設定做為執行階段引數傳遞,請按照下列步驟操作:

  1. 接著,請點選「觸發條件設定」,系統就會顯示您先前設定的上游管道的任何執行階段引數。選擇要在觸發事件執行時,從上游管道傳遞至下游管道的執行階段引數。
  2. 按一下「Plugin config」分頁標籤,即可查看觸發時會從上游管道傳遞至下游管道的項目清單。
  3. 按一下「設定並啟用觸發條件」