部署 Dataflow 管道

本文將簡要說明管道部署作業,並重點介紹您可以在已部署的管道上執行的部分作業。

執行管道

建立測試 Apache Beam 管道後,請執行管道。您可以在本機執行管道,測試及偵錯 Apache Beam 管道,也可以在 Dataflow 上執行。Dataflow 是一種資料處理系統,可用於執行 Apache Beam 管道。

在本機上執行

在本機執行管道。

Java

下列程式碼範例取自快速入門指南,說明如何在本地執行 WordCount 管道。詳情請參閱如何在本機執行 Java 管道

在終端機中執行下列指令:

  mvn compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--output=counts"
  

Python

下列程式碼範例取自快速入門指南,說明如何在本地執行 WordCount 管道。詳情請參閱如何在本機執行 Python 管道

在終端機中執行下列指令:

python -m apache_beam.examples.wordcount \ --output outputs

Go

下列程式碼範例取自快速入門指南,說明如何在本地執行 WordCount 管道。詳情請參閱如何在本機執行 Go 管道

在終端機中執行下列指令:

    go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
  

瞭解如何在本機電腦上使用直接執行器在本機執行管道。

在 Dataflow 上執行

在 Dataflow 上執行管道。

Java

下列程式碼範例取自快速入門指南,說明如何在 Dataflow 上執行 WordCount 管道。詳情請參閱這篇文章,瞭解如何在 Dataflow 上執行 Java 管道。

在終端機中執行下列指令 (來自 word-count-beam 目錄):

  mvn -Pdataflow-runner compile exec:java \
    -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--project=PROJECT_ID \
    --gcpTempLocation=gs://BUCKET_NAME/temp/ \
    --output=gs://BUCKET_NAME/output \
    --runner=DataflowRunner \
    --region=REGION"
    

更改下列內容:

  • PROJECT_ID:您的 Google Cloud 專案 ID
  • BUCKET_NAME:Cloud Storage bucket 的名稱
  • REGIONDataflow 區域,例如 us-central1

Python

下列程式碼範例取自快速入門指南,說明如何在 Dataflow 上執行 WordCount 管道。詳情請參閱這篇文章,瞭解如何在 Dataflow 上執行 Python 管道。

在終端機中執行下列指令:

python -m apache_beam.examples.wordcount \
    --region DATAFLOW_REGION \
    --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --temp_location gs://STORAGE_BUCKET/tmp/

更改下列內容:

  • DATAFLOW_REGION:您要部署 Dataflow 工作的區域,例如 europe-west1

    --region 標記會覆寫中繼資料伺服器、本機用戶端或環境變數中設定的預設地區。

  • STORAGE_BUCKET您先前複製的 Cloud Storage 名稱
  • PROJECT_ID: Google Cloud 專案 ID,您先前已複製

Go

下列程式碼範例取自快速入門指南,說明如何在 Dataflow 上執行 WordCount 管道。詳情請參閱這篇文章,瞭解如何在 Dataflow 上執行 Go 管道。

在終端機中執行下列指令:

  posix-terminal go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://STORAGE_BUCKET/binaries/
  

更改下列內容:

  • STORAGE_BUCKET:Cloud Storage 值區名稱。
  • PROJECT_ID:專案 ID。 Google Cloud
  • DATAFLOW_REGION:要部署 Dataflow 工作的區域。例如:europe-west1。 如需可用位置清單,請參閱「Dataflow 位置」。 請注意,--region 標記會覆寫中繼資料伺服器、本機用戶端或環境變數中設定的預設地區。

瞭解如何使用 Dataflow 執行器,在 Dataflow 服務上執行管道。

在 Dataflow 上執行管道時,Dataflow 會將 Apache Beam 管道程式碼轉換為 Dataflow 工作。Dataflow 會完全代管 Compute EngineCloud Storage 等服務,以執行 Dataflow 工作,並自動啟動及停用必要資源。 Google Cloud 如要進一步瞭解 Dataflow 如何將 Apache Beam 程式碼轉換為 Dataflow 工作,請參閱「管道生命週期」。

管道驗證

在 Dataflow 上執行管道時,Dataflow 會在啟動工作前,對管道執行驗證測試。如果驗證測試發現管道有問題,Dataflow 會提早讓工作提交失敗。在工作記錄中,Dataflow 會加入含有下列文字的訊息。每則訊息也會提供驗證結果的詳細資料,以及解決問題的說明。

The preflight pipeline validation failed for job JOB_ID.

執行的驗證測試取決於 Dataflow 作業使用的資源和服務。

  • 如果專案已啟用 Service Usage API,管道驗證測試會檢查執行 Dataflow 作業所需的服務是否已啟用。
  • 如果專案已啟用 Cloud Resource Manager API,管道驗證測試會檢查您是否具備執行 Dataflow 工作所需的專案層級設定。

如要進一步瞭解如何啟用服務,請參閱「啟用及停用服務」。

如要瞭解如何解決管道驗證期間發現的權限問題,請參閱「管道驗證失敗」一文。

如要覆寫管道驗證並啟動含有驗證錯誤的工作,請使用下列管道服務選項

Java

--dataflowServiceOptions=enable_preflight_validation=false

Python

--dataflow_service_options=enable_preflight_validation=false

Go

--dataflow_service_options=enable_preflight_validation=false

設定管道選項

您可以在 Apache Beam 管道程式碼中設定管道選項,藉此控管 Dataflow 執行工作時的某些做法。舉例來說,您可以使用管道選項,設定管道要在工作站虛擬機器、Dataflow 服務後端或本機上執行。

管理管道依附元件

許多 Apache Beam 管道都能使用預設的 Dataflow 執行階段環境執行。不過,部分資料處理用途適合使用其他程式庫或類別。在這種情況下,您可能需要管理管道依附元件。如要進一步瞭解如何管理依附元件,請參閱「在 Dataflow 中管理管道依附元件」。

監控工作

Dataflow 提供多種工具,方便您查看工作狀況,例如 Dataflow 監控介面Dataflow 指令列介面

存取工作站 VM

您可以使用Google Cloud 控制台查看特定管道的 VM 執行個體。在主控台中,您可以使用 SSH 存取每個執行個體。 不過,工作完成或失敗後,Dataflow 服務會自動關閉並清除 VM 執行個體。

工作最佳化

除了管理 Google Cloud 資源,Dataflow 還能自動執行及最佳化分布式平行處理作業的許多層面。

平行處理和分布

Dataflow 會自動分割資料,並將工作站程式碼分布到 Compute Engine 執行個體進行平行處理。詳情請參閱平行化和分配

融合和合併最佳化

Dataflow 會使用您的管道程式碼建立一個代表管道的 PCollection 和轉換作業的執行圖,並對這個執行圖進行最佳化,以提高效能和資源的使用效率。Dataflow 也會自動對成本高昂的作業 (例如資料匯總) 進行最佳化。詳情請參閱「融合最佳化」和「合併最佳化」。

自動微調功能

Dataflow 服務包含多項能即時調整資源分配和資料分割的功能。這些功能可讓 Dataflow 盡可能快速又有效率地執行您的工作。這些功能包括:

Streaming Engine

根據預設,Dataflow 管道執行器完全是在工作站虛擬機器上執行串流管道步驟,並且使用工作站的 CPU、記憶體和 Persistent Disk 的儲存空間。Dataflow 的 Streaming Engine 可以將管道執行作業從工作站 VM 移至 Dataflow 服務後端。詳情請參閱「Streaming Engine」。

Dataflow 彈性資源排程

Dataflow FlexRS 會使用進階排程技術Dataflow Shuffle服務,並結合先占虛擬機器 (VM) 執行個體和一般 VM,藉此減少批次處理費用。如果 Compute Engine 在系統事件期間停止先占 VM 執行個體,Dataflow 會透過平行執行先占 VM 與一般 VM 來改善使用者體驗。當 Compute Engine 先占您的先占 VM 時,FlexRS 可協助確保管道繼續執行原有的工作,使其不致遺失。如要進一步瞭解 FlexRS,請參閱「在 Dataflow 中使用彈性資源排程」一文。

Dataflow 受防護的 VM

自 2022 年 6 月 1 日起,Dataflow 服務會為所有工作站使用 Shielded VM。如要進一步瞭解受防護的 VM 功能,請參閱「受防護的 VM」。