自動水平調度資源

Dataflow 可透過自動水平調度資源功能,為工作選擇適當的工作站執行個體數量,並視需要新增或移除工作站。Dataflow 會根據工作站的平均 CPU 使用率和管道的平行處理程度調度資源。管道的平行處理是估算值,代表在任何指定時間最有效率地處理資料所需的執行緒數量。

批次和串流管道均支援自動水平調度資源。

批次自動調度資源

所有批次管道都會預設啟用水平自動調度資源功能。 Dataflow 會根據管道每個階段的預估工作總量,自動選擇工作站數量。這項預估值取決於輸入大小和目前的輸送量。Dataflow 會根據執行進度,每 30 秒重新評估工作量。隨著預估工作總量增加或減少,Dataflow 會動態調高或調低工作站數量。

工作站數量與工作量呈次線性關係。舉例來說,工作量加倍的工作,工作人員不會加倍。

如果發生下列任一情況,Dataflow 會維持或減少工作站數量,以節省閒置資源:

  • 工作站的平均 CPU 使用率低於 5%。
  • 由於無法平行處理的工作 (例如壓縮檔導致資料無法分割,或 I/O 模組未分割),平行處理受到限制。
  • 平行處理程度是固定的,例如寫入 Cloud Storage 中的現有檔案時。

如要設定工作站數量上限,請設定 --maxNumWorkers 管道選項。預設值為 2,000。如要設定工作站數量下限,請設定 --dataflow-service-options=min_num_workers 服務選項。這些標記為選用項目。

串流自動調度資源

對於串流工作,Dataflow 可透過自動水平調度資源功能,根據負載變化及資源使用情況調整工作站數量。

對於使用 Streaming Engine 的串流工作,系統預設會啟用水平自動調度資源功能。如要為未使用 Streaming Engine 的串流工作啟用水平自動調度資源,請在啟動管道時設定下列管道選項

Java

--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=MAX_WORKERS

MAX_WORKERS 改成工作站執行個體數量上限。

Python

--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=MAX_WORKERS

MAX_WORKERS 改成工作站執行個體數量上限。

Go

--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=MAX_WORKERS

MAX_WORKERS 改成工作站執行個體數量上限。

如要設定工作站數量下限,請設定 --dataflow-service-options=min_num_workers 服務選項。設定這個值後,水平自動調度資源就不會將工作站數量調度至低於指定數量。這個旗標是選用的。

串流工作執行期間,您可以使用執行中工作更新,更新工作站數量下限和上限。如要調整設定,請設定 min-num-workersmax-num-workers 旗標。 詳情請參閱「更新自動調度資源範圍」。

停用自動水平調度資源

如要停用水平自動調度資源,請在執行工作時設定下列管道選項

Java

--autoscalingAlgorithm=NONE

如果停用自動水平調度資源,Dataflow 會根據 --numWorkers 選項設定工作站數量。

Python

--autoscaling_algorithm=NONE

如果停用自動水平調度資源,Dataflow 會根據 --num_workers 選項設定工作站數量。

Go

--autoscaling_algorithm=NONE

如果停用自動水平調度資源,Dataflow 會根據 --num_workers 選項設定工作站數量。

自訂來源

如果您建立自訂資料來源,可以實作下列方法,向水平自動調度資源演算法提供更多資訊,藉此提升效能:

Java

有界來源

  • BoundedSource 子類別中,實作 getEstimatedSizeBytes 方法。Dataflow 服務會使用 getEstimatedSizeBytes 計算一開始要提供給您管道的工作站數量。
  • BoundedReader 子類別中,實作 getFractionConsumed 方法。Dataflow 服務會使用 getFractionConsumed 追蹤讀取進度,並彙整出讀取期間應使用的正確工作站數量。

無邊界來源

來源必須告知 Dataflow 服務待處理作業的數量。待處理作業是指尚未被資料來源處理過的預估輸入位元組。如要告知服務待處理作業的數量,請在您的 UnboundedReader 類別中實作下列其中一個方法。

  • getSplitBacklogBytes() - 來源的目前組別待處理作業。服務會匯總所有組別的待處理作業。
  • getTotalBacklogBytes() - 所有組別的全域待處理作業。在某些情況下,系統無法提供個別組別的待處理作業數量,而只能提供所有組別的待處理作業總數。只有第一個組別 (組別 ID「0」) 需要提供待處理作業總數。

Apache Beam 存放區包含實作 UnboundedReader 類別自訂來源的多個範例

Python

有界來源

  • BoundedSource 子類別中,實作 estimate_size 方法。Dataflow 服務會使用 estimate_size 計算一開始要提供給您管道的工作站數量。
  • RangeTracker 子類別中,實作 fraction_consumed 方法。Dataflow 服務會使用 fraction_consumed 追蹤讀取進度,並彙整出讀取期間應使用的正確工作站數量。

Go

有界來源

  • RangeTracker 中實作 GetProgress() 方法。Dataflow 服務會使用 GetProgress 追蹤讀取進度,並彙整出讀取期間應使用的正確工作站數量。

限制

  • 在執行 Dataflow Prime 的工作中,水平自動調度資源會在垂直自動調度資源期間停用,並在垂直自動調度資源結束後最多停用 10 分鐘。詳情請參閱「對自動水平調度資源的影響」。
  • 如果管道未使用 Dataflow Shuffle,Dataflow 可能無法有效縮減工作站,因為工作站可能已將重組資料儲存在本機磁碟中。
  • PeriodicImpulse如果管道使用舊版 SDK 中的 PeriodicImpulse,Dataflow 工作站不會如預期縮減。

後續步驟