管道生命週期

本頁將概述管道生命週期,從管道程式碼到 Dataflow 工作。

本頁面說明下列概念:

  • 執行圖的用途,以及 Apache Beam 管道如何成為 Dataflow 工作
  • Dataflow 如何處理錯誤
  • Dataflow 如何自動平行處理管道中的處理邏輯,並將處理邏輯分布到執行工作的工作站
  • Dataflow 可能進行的工作最佳化

執行圖

執行 Dataflow 管道時,Dataflow 會以建構 Pipeline 物件的程式碼建立執行圖,包括所有轉換和相關聯的處理函式,例如 DoFn 物件。這是管道執行圖,這個階段稱為「圖形建構時間」

在建構圖形期間,Apache Beam 會在本機執行管道程式碼主要進入點的程式碼,並在呼叫來源、接收器或轉換步驟時停止,然後將這些呼叫轉換為圖形的節點。因此,管道進入點中的一段程式碼 (Java 和 Go main 方法或 Python 指令碼的頂層) 會在本機上執行管道的機器上執行。在 DoFn 物件的方法中宣告的相同程式碼,會在 Dataflow 工作站中執行。

舉例來說,Apache Beam SDK 隨附的 WordCount 範例包含一系列轉換,可讀取、擷取、計數、格式化及寫入文字集合中的個別字詞,以及計算每個字詞的出現次數。下圖說明 WordCount 管道中的轉換如何擴展至執行圖:

WordCount 程式範例中的轉換擴展成了執行步驟圖,將由 Dataflow 服務執行。

圖 1:WordCount 範例執行圖

執行圖和您在建構管道時指定的轉換順序通常會有不同。這是因為在代管雲端資源上執行前,Dataflow 服務會先針對執行圖進行各種最佳化和融合。Dataflow 服務在執行管道時會遵循資料相依性,不過,如果各步驟間沒有任何資料相依性,系統就會按任何順序執行步驟。

如要查看 Dataflow 為管道產生的尚未最佳化執行圖,請在 Dataflow 監控介面中選取工作。如要進一步瞭解如何查看作業,請參閱使用 Dataflow 監控介面

在建構圖表期間,Apache Beam 會驗證管道參照的任何資源 (例如 Cloud Storage 值區、BigQuery 資料表,以及 Pub/Sub 主題或訂閱項目) 是否確實存在且可存取。驗證作業是透過對相應服務的標準 API 呼叫完成,因此請務必確保用於執行管道的使用者帳戶與必要服務的連線正常,且已獲授權呼叫服務的 API。將管道提交至 Dataflow 服務前,Apache Beam 也會檢查其他錯誤,並確保管道圖形未包含任何非法作業。

執行圖隨後會轉譯為 JSON 格式,轉譯後的 JSON 執行圖會傳輸到 Dataflow 服務端點。

接著,Dataflow 服務會驗證 JSON 執行圖。只要圖形通過驗證,就會變成 Dataflow 服務上的工作。您可以使用 Dataflow 監控介面查看工作、執行圖、狀態和記錄資訊。

Java

Dataflow 服務會將回應傳送到您執行 Dataflow 程式的機器。這個回應會封裝在物件 DataflowPipelineJob 中,其中包含 Dataflow 工作的 jobId。您可以使用 jobId,透過 Dataflow 監控介面Dataflow 指令列介面來監控、追蹤及解決工作問題。詳情請參閱「DataflowPipelineJob 的 API 參考資料」。

Python

Dataflow 服務會將回應傳送到您執行 Dataflow 程式的機器。這個回應會封裝在物件 DataflowPipelineResult 中,其中包含 Dataflow 工作的 job_id。使用 job_id,透過 Dataflow 監控介面Dataflow 指令列介面來監控、追蹤及解決工作問題。

Go

Dataflow 服務會將回應傳送到您執行 Dataflow 程式的機器。這個回應會封裝在物件 dataflowPipelineResult 中,其中包含 Dataflow 工作的 jobID。使用 jobID,透過 Dataflow 監控介面Dataflow 指令列介面來監控、追蹤及解決工作問題。

您在本機上執行管道時也需要建構圖形,只不過圖形不會轉譯為 JSON,也不會傳輸到服務。系統會在您用來啟動 Dataflow 程式的同一台虛擬機器中,從本機執行圖形。詳情請參閱針對本機執行作業設定 PipelineOptions

錯誤及例外狀況處理

您的管道在處理資料時可能會擲回例外狀況。這些錯誤中有些是暫時性,例如暫時無法存取外部服務。其他錯誤則是永久性,例如已損毀或無法剖析的輸入資料導致錯誤,或在運算期間出現 null 指標。

Dataflow 會以任意組合來處理元素,只要組合中有任何元素擲回錯誤,就會重新嘗試處理整個組合。以批次模式執行時,內含失敗項目的組合可重試 4 次。如果單一組合失敗達四次,管道程序就會完全失敗。以串流模式執行時,內含失敗項目的組合會無限期重試,而這可能會導致管道永久停滯。

如果以批次模式處理,在管道工作完全失敗 (也就是組合重試失敗 4 次) 之前,您可能會先遇到大量的個別錯誤。舉例來說,假設您的管道程式要處理 100 個組合,Dataflow 會產生數百項個別失敗,直到單一組合「達到 4 次失敗」這個條件之後才會結束。

啟動工作站錯誤 (例如無法在工作站上安裝套件) 是暫時性的。這種情況會導致無限期重試,並可能造成管道永久停滯。

平行處理和分布

Dataflow 服務會自動平行處理管道中的處理邏輯,並將處理邏輯分布到您指派來執行工作的工作站。Dataflow 使用程式設計模型中的抽象層來代表平行處理函式。舉例來說,管道中的 ParDo 轉換會讓 Dataflow 自動將以 DoFn 物件表示的處理程式碼,分配到多個工作站平行執行。

作業平行處理分為兩種類型:

  • 水平平行處理是指管道資料分割後,由多個工作站同時處理。Dataflow 執行階段環境是由分散式工作站集區提供支援。集區中的工作站越多,管道的潛在平行處理能力就越高,但這類設定的成本也較高。理論上,水平平行處理沒有上限。不過,為盡量提升整個車隊的資源使用率,Dataflow 會將工作站集區限制為 4000 個工作站。

  • 垂直平行處理是指管道資料分割後,由同一工作站上的多個 CPU 核心處理。每個工作站都由 Compute Engine VM 驅動。VM 可以執行多個程序,充分運用所有 CPU 核心。核心數較多的 VM 垂直平行處理潛力較高,但這項設定會導致成本增加。核心數量越多,記憶體用量通常就會增加,因此核心數量通常會與記憶體大小一併調整。由於電腦架構的實體限制,垂直平行處理的上限遠低於水平平行處理的上限。

管理平行處理

根據預設,Dataflow 會自動管理工作平行處理程序。 Dataflow 會監控工作的執行階段統計資料 (例如 CPU 和記憶體用量),以判斷如何調度工作。視工作設定而定,Dataflow 可以水平調度工作 (稱為自動水平調度資源),或垂直調度工作 (稱為垂直調度資源)。自動調度平行處理作業可最佳化作業成本和作業效能。

為提升工作效能,Dataflow 也會從內部最佳化管道。常見的最佳化方式包括融合最佳化合併最佳化。透過融合管道步驟,Dataflow 可消除與分散式系統中協調步驟相關的不必要成本,並避免個別執行每個步驟。

影響平行處理的因素

下列因素會影響 Dataflow 作業的平行處理功能。

輸入來源

如果輸入來源不允許平行處理,輸入來源擷取步驟可能會成為 Dataflow 工作中的瓶頸。舉例來說,從單一壓縮文字檔擷取資料時,Dataflow 無法平行處理輸入資料。由於大多數壓縮格式無法在擷取期間任意分割成分片,因此 Dataflow 必須從檔案開頭依序讀取資料。管道的整體輸送量會因管道的非平行部分而減緩。如要解決這個問題,請使用更具延展性的輸入來源。

在某些情況下,步驟融合也會減少平行處理。 如果輸入來源不允許平行處理,且 Dataflow 將資料擷取步驟與後續步驟合併,並將合併後的步驟指派給單一執行緒,整個管道的執行速度可能會變慢。

為避免這種情況,請在輸入來源擷取步驟後插入 Redistribute 步驟。詳情請參閱本文的「防止融合」一節。

預設扇出和資料形狀

單一轉換步驟的預設扇出可能會成為瓶頸,並限制平行處理。舉例來說,具有「高度擴散傳遞功能」的 ParDo 轉換可能會導致融合限制 Dataflow 最佳化運用工作站的能力。在這類作業中,您的輸入集合可能只有相對少數的元素,但 ParDo 會產生元素數量為其數百或數千倍的輸出,隨後則有另一個 ParDo。如果 Dataflow 服務將這些 ParDo 作業都融合在一起,就算中繼 PCollection 包含了更多元素,這個步驟中的平行處理原則也只能侷限於輸入集合的最大項目數。

如需可能的解決方案,請參閱本文的「防止融合」一節。

資料形狀

資料形狀 (無論是輸入資料或中繼資料) 可能會限制平行處理。舉例來說,當自然鍵 (例如城市) 上的 GroupByKey 步驟後接續 mapCombine 步驟時,Dataflow 會合併這兩個步驟。如果主要空間很小 (例如五個城市),且其中一個主要空間非常熱門 (例如大城市),則 GroupByKey步驟輸出內容中的大部分項目都會分配給一個程序。這個程序會成為瓶頸,導致工作速度變慢。

在本例中,您可以將 GroupByKey 步驟結果重新分配到較大的虛擬金鑰空間,而不是使用自然金鑰。在 GroupByKey 步驟和 mapCombine 步驟之間插入 Redistribute 步驟。在 Redistribute 步驟中,建立人工鍵空間 (例如使用 hash 函式),克服資料形狀造成的平行處理限制。

詳情請參閱本文的「防止融合」一節。

輸出接收器

接收器是一種轉換,可將資料寫入外部資料儲存系統,例如檔案或資料庫。在實務上,接收器會以標準 DoFn 物件的形式建立及實作,並用於將 PCollection 具體化至外部系統。在本例中,PCollection 包含最終管道結果。呼叫接收器 API 的執行緒可以平行執行,將資料寫入外部系統。根據預設,執行緒之間不會進行協調。如果沒有中介層緩衝寫入要求和控制流程,外部系統可能會過載,導致寫入輸送量降低。增加平行處理程序來擴充資源,可能會進一步減緩管道速度。

解決這個問題的方法是減少寫入步驟的平行處理。您可以在寫入步驟之前新增 GroupByKey 步驟。GroupByKey 步驟會將輸出資料分組為較小的批次,以減少 RPC 呼叫總數,以及與外部系統的連線。舉例來說,您可以使用 GroupByKey,從 100 萬個資料點中建立 50 個資料點的雜湊空間。

這個方法的缺點是會為平行處理引入硬式編碼限制。另一種做法是在寫入資料時,於接收器中實作指數輪詢。這個選項可提供最低限度的用戶端節流。

監控平行處理

如要監控平行處理,可以使用 Google Cloud 控制台查看偵測到的任何落後者。詳情請參閱「排解批次作業中的落後者」和「排解串流作業中的落後者」。

融合最佳化

JSON 格式的管道執行圖通過驗證後,Dataflow 服務可能會調整圖形以便發揮最佳效能。最佳化做法包括將管道執行圖中的多個步驟或轉換融合成單一步驟。藉由融合步驟,Dataflow 服務就不用將管道中的每個中繼 PCollection 都具體化,可避免耗費記憶體或造成處理負擔,有助節省成本。

雖然您在管道建構中指定的所有轉換都會在服務上執行,但為確保管道工作發揮最佳效率,轉換作業的執行順序可能有所不同,或者有時可能也會併入已融合的大型轉換進行處理。Dataflow 服務會依循執行圖中各步驟之間的資料相依性,不過沒有資料相依性的步驟就會依任何順序執行。

融合示例

下圖以 Java 適用的 Apache Beam SDK 隨附的 WordCount 範例來說明 Dataflow 服務如何最佳化及融合 WordCount 執行圖,以利提升執行效率:

Dataflow 服務將 WordCount 程式範例執行圖最佳化並融合其中的步驟。

圖 2:WordCount 範例的最佳化執行圖

防止融合

有時候,Dataflow 可能無法準確推測融合管道作業的最佳方式,而使得 Dataflow 無法充分運用所有可用的工作站,在這種情況下,您可以使用 Redistribute 轉換,提示 Dataflow 重新分配資料。

如要新增 Redistribute 轉換,請呼叫下列其中一種方法:

  • Redistribute.arbitrarily:表示資料可能不平衡。Dataflow 會選擇最佳演算法來重新分配資料。

  • Redistribute.byKey:表示鍵/值配對的 PCollection 可能不平衡,應根據鍵重新分配。通常,Dataflow 會將單一鍵的所有元素放在同一個工作站執行緒。不過,我們無法保證鍵會共置,且元素會獨立處理。

如果管道包含 Redistribute 轉換,Dataflow 通常會防止合併 Redistribute 轉換前後的步驟,並重組資料,讓 Redistribute 轉換下游的步驟有更理想的平行處理。

監控融合

您可以在 Google Cloud 控制台、使用 gcloud CLI 或 API,存取最佳化圖表和融合階段。

主控台

如要在控制台中查看圖表的融合階段和步驟,請在 Dataflow 工作的「執行詳細資料」分頁中,開啟「階段工作流程」圖表檢視畫面。

如要查看階段融合的元件步驟,請在圖表中按一下融合的階段。在「階段資訊」窗格中,「元件步驟」列會顯示合併的階段。有時,單一複合轉換的部分內容會融合到多個階段。

gcloud

如要使用 gcloud CLI 存取最佳化圖表和融合階段,請執行下列 gcloud 指令:

  gcloud dataflow jobs describe --full JOB_ID --format json

JOB_ID 替換為 Dataflow 工作 ID。

如要擷取相關位元,請將 gcloud 指令的輸出內容透過管道傳送至 jq

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

如要查看輸出回應檔案中融合階段的說明,請在 ComponentTransform 陣列中查看 ExecutionStageSummary 物件。

API

如要使用 API 存取最佳化圖表和融合階段,請呼叫 project.locations.jobs.get

如要查看輸出回應檔案中融合階段的說明,請在 ComponentTransform 陣列中查看 ExecutionStageSummary 物件。

合併最佳化

「匯總作業」是大規模資料處理的一個重要概念。匯總會將概念上相差甚遠的資料聚集在一起,因此非常適合用於建立關聯。Dataflow 程式設計模型會以 GroupByKeyCoGroupByKeyCombine 轉換來表示匯總作業。

Dataflow 的匯總作業能合併整個資料集的資料,包括散佈到不同工作站的資料。在這類匯總作業中,最有效率的做法是先盡可能合併本機的資料,然後再去合併不同執行個體的資料。您套用 GroupByKey 或其他匯總轉換時,Dataflow 服務會在進行主要分組作業之前,先在本機自動執行局部合併。

進行局部或多層次合併時,Dataflow 服務會依據管道的資料處理模式是批次或串流來做不同的決定。對於有邊界的資料,服務會以效率為優先,因此會盡可能執行本機合併;對於無邊界的資料,服務會以縮短延遲時間為優先,因此可能不會進行局部合併,因為這會增加延遲。