使用工作建構工具建立自訂工作

工作建立工具可讓您建立自訂批次和串流 Dataflow 工作。您也可以將工作建構工具工作儲存為 Apache Beam YAML 檔案,以便共用及重複使用。

建立新的管道

如要在工作建構工具中建立新管道,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的「Jobs」(工作) 頁面。

    前往「Jobs」(工作) 頁面

  2. 按一下「透過建構工具建立工作」

  3. 在「工作名稱」部分,輸入工作名稱。

  4. 選取「批次」或「串流」。

  5. 如果選取「串流」,請選取視窗模式。然後輸入視窗規格,如下所示:

    • 固定時間區間:以秒為單位輸入時間區間大小。
    • 滑動視窗:輸入視窗大小和視窗週期 (以秒為單位)。
    • 工作階段時間範圍:輸入工作階段間隔 (以秒為單位)。

    如要進一步瞭解視窗,請參閱「視窗和視窗函式」。

接著,請按照下列各節的說明,將來源、轉換和接收器新增至管道。

將來源新增至管道

管道至少須有一個來源。工作建構工具一開始會填入空白來源。如要設定來源,請執行下列步驟:

  1. 在「來源名稱」方塊中,輸入來源名稱或使用預設名稱。執行工作時,名稱會顯示在工作圖表中。

  2. 在「來源類型」清單中,選取資料來源類型。

  3. 視來源類型而定,提供額外的設定資訊。 舉例來說,如果您選取 BigQuery,請指定要從哪個資料表讀取資料。

    如果選取 Pub/Sub,請指定訊息結構定義。輸入要從 Pub/Sub 訊息讀取的每個欄位名稱和資料類型。管線會捨棄結構定義中未指定的任何欄位。

  4. 選用:針對部分來源類型,您可以按一下「預覽來源資料」來預覽來源資料。

如要在管道中新增其他來源,請按一下「新增來源」。如要合併多個來源的資料,請在管道中新增 SQLJoin 轉換。

在管道中新增轉換

視需要為管道新增一或多個轉換。您可以使用下列轉換指令,操控、匯總或彙整來源和其他轉換作業中的資料:

轉換類型 說明 Beam YAML 轉換作業資訊
篩選 (Python) 使用 Python 運算式篩選記錄。
SQL 轉換 透過 SQL 陳述式處理記錄或彙整多項輸入內容。
對應欄位 (Python) 運用 Python 運算式和函式,新增欄位或重新對應整份記錄。
對應欄位 (SQL) 透過 SQL 運算式新增或對應記錄欄位。
YAML 轉換作業:
  1. AssertEqual
  2. AssignTimestamps
  3. 合併
  4. 分割
  5. 篩選器
  6. Flatten
  7. 加入
  8. LogForTesting
  9. MLTransform
  10. MapToFields
  11. PyTransform
  12. WindowInfo

使用 Beam YAML SDK 中的任何轉換作業。

YAML 轉換作業設定:提供 YAML 轉換作業設定參數,做為 YAML 對應。建立 Beam YAML 轉換之後,系統會將鍵/值組合填入該轉換作業的設定專區。如要查看各轉換作業類型支援的設定參數,請參閱 Beam YAML 轉換作業說明文件。設定參數範例:

合併
group_by:
combine:
加入
type:
equalities:
fields:
記錄 將記錄記錄至工作的工作站記錄。
分組依據 使用 count()sum() 等函式合併記錄。
加入 彙整相等欄位中的多項輸入內容。
Explode 攤平陣列欄位來分割記錄。

如要新增轉換作業,請按照下列步驟操作:

  1. 按一下「新增轉換」

  2. 在「轉換」名稱方塊中,輸入轉換名稱或使用預設名稱。執行工作時,名稱會顯示在工作圖表中。

  3. 在「轉換類型」清單中,選取轉換類型。

  4. 視轉換類型而定,提供其他設定資訊。舉例來說,如果您選取「篩選條件 (Python)」,請輸入要用做篩選條件的 Python 運算式。

  5. 選取轉換作業的輸入步驟。輸入步驟是來源或轉換,其輸出內容會提供這個轉換的輸入內容。

將接收器新增至管道

管道至少須有一個接收器。工作建構工具一開始會填入空白的接收器。如要設定接收器,請執行下列步驟:

  1. 在「Sink name」(接收器名稱) 方塊中輸入接收器名稱,或使用預設名稱。 執行工作時,名稱會顯示在工作圖表中。

  2. 在「接收器類型」清單中,選取接收器類型。

  3. 視接收器類型而定,提供額外的設定資訊。 舉例來說,如果您選取 BigQuery 接收器,請選取要寫入的 BigQuery 資料表。

  4. 選取接收器的輸入步驟。輸入步驟是來源或轉換,其輸出內容會提供這個轉換的輸入內容。

  5. 如要將其他接收器新增至管道,請按一下「新增接收器」

執行管道

如要從作業建構工具執行管道,請完成下列步驟:

  1. 選用:設定 Dataflow 工作選項。如要展開「Dataflow options」部分,請按一下 展開箭頭。

  2. 按一下「Run job」(執行工作)。作業建構工具會導覽至已提交作業的作業圖。您可以使用工作圖表監控工作狀態。

啟動前請先驗證管道

對於設定複雜的管道 (例如 Python 篩選器和 SQL 運算式),建議您先檢查管道設定是否有語法錯誤,再啟動管道。如要驗證管道語法,請完成下列步驟:

  1. 按一下「驗證」開啟 Cloud Shell,然後啟動驗證服務。
  2. 按一下「開始驗證」
  3. 如果驗證期間發現錯誤,系統會顯示紅色驚嘆號。
  4. 修正偵測到的錯誤,然後按一下「驗證」,確認修正內容。如果沒有發現錯誤,系統會顯示綠色勾號。

透過 gcloud CLI 執行

您也可以使用 gcloud CLI 執行 Beam YAML pipeline。如要使用 gcloud CLI 執行工作建構工具 pipeline,請按照下列步驟操作:

  1. 按一下「儲存 YAML」開啟「儲存 YAML」視窗。

  2. 執行下列其中一項動作:

    • 如要儲存至 Cloud Storage,請輸入 Cloud Storage 路徑,然後按一下「儲存」
    • 如要下載本機檔案,請按一下「下載」
  3. 在殼層或終端機中執行下列指令:

      gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
    

    請將 YAML_FILE_PATH 改成 YAML 檔案的路徑,可以是本機路徑或 Cloud Storage 路徑。

後續步驟