Dataflow 可透過自動水平調度資源功能,為工作選擇適當的工作站執行個體數量,並視需要新增或移除工作站。Dataflow 會根據工作站的平均 CPU 使用率和管道的平行處理程度調度資源。管道的平行處理是估算值,代表在任何指定時間最有效率地處理資料所需的執行緒數量。
批次和串流管道均支援自動水平調度資源。
批次自動調度資源
所有批次管道都會預設啟用水平自動調度資源功能。 Dataflow 會根據管道每個階段的預估工作總量,自動選擇工作站數量。這項預估值取決於輸入大小和目前的輸送量。Dataflow 會根據執行進度,每 30 秒重新評估工作量。隨著預估工作總量增加或減少,Dataflow 會動態調高或調低工作站數量。
工作站數量與工作量呈次線性關係。舉例來說,工作量加倍的工作,工作人員不會加倍。
如果發生下列任一情況,Dataflow 會維持或減少工作站數量,以節省閒置資源:
- 工作站的平均 CPU 使用率低於 5%。
- 由於無法平行處理的工作 (例如壓縮檔導致資料無法分割,或 I/O 模組未分割),平行處理受到限制。
- 平行處理程度是固定的,例如寫入 Cloud Storage 中的現有檔案時。
如要設定工作站數量上限,請設定 --maxNumWorkers
管道選項。預設值為 2,000
。如要設定工作站數量下限,請設定 --dataflow-service-options=min_num_workers
服務選項。這些標記為選用項目。
串流自動調度資源
對於串流工作,Dataflow 可透過自動水平調度資源功能,根據負載變化及資源使用情況調整工作站數量。
對於使用 Streaming Engine 的串流工作,系統預設會啟用水平自動調度資源功能。如要為未使用 Streaming Engine 的串流工作啟用水平自動調度資源,請在啟動管道時設定下列管道選項:
Java
--autoscalingAlgorithm=THROUGHPUT_BASED
--maxNumWorkers=MAX_WORKERS
將 MAX_WORKERS 改成工作站執行個體數量上限。
Python
--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=MAX_WORKERS
將 MAX_WORKERS 改成工作站執行個體數量上限。
Go
--autoscaling_algorithm=THROUGHPUT_BASED
--max_num_workers=MAX_WORKERS
將 MAX_WORKERS 改成工作站執行個體數量上限。
如要設定工作站數量下限,請設定 --dataflow-service-options=min_num_workers
服務選項。設定這個值後,水平自動調度資源就不會將工作站數量調度至低於指定數量。這個旗標是選用的。
串流工作執行期間,您可以使用執行中工作更新,更新工作站數量下限和上限。如要調整設定,請設定 min-num-workers
和 max-num-workers
旗標。
詳情請參閱「更新自動調度資源範圍」。
停用自動水平調度資源
如要停用水平自動調度資源,請在執行工作時設定下列管道選項。
Java
--autoscalingAlgorithm=NONE
如果停用自動水平調度資源,Dataflow 會根據 --numWorkers
選項設定工作站數量。
Python
--autoscaling_algorithm=NONE
如果停用自動水平調度資源,Dataflow 會根據 --num_workers
選項設定工作站數量。
Go
--autoscaling_algorithm=NONE
如果停用自動水平調度資源,Dataflow 會根據 --num_workers
選項設定工作站數量。
自訂來源
如果您建立自訂資料來源,可以實作下列方法,向水平自動調度資源演算法提供更多資訊,藉此提升效能:
Java
有界來源
- 在
BoundedSource
子類別中,實作getEstimatedSizeBytes
方法。Dataflow 服務會使用getEstimatedSizeBytes
計算一開始要提供給您管道的工作站數量。 - 在
BoundedReader
子類別中,實作getFractionConsumed
方法。Dataflow 服務會使用getFractionConsumed
追蹤讀取進度,並彙整出讀取期間應使用的正確工作站數量。
無邊界來源
來源必須告知 Dataflow 服務待處理作業的數量。待處理作業是指尚未被資料來源處理過的預估輸入位元組。如要告知服務待處理作業的數量,請在您的 UnboundedReader
類別中實作下列其中一個方法。
getSplitBacklogBytes()
- 來源的目前組別待處理作業。服務會匯總所有組別的待處理作業。getTotalBacklogBytes()
- 所有組別的全域待處理作業。在某些情況下,系統無法提供個別組別的待處理作業數量,而只能提供所有組別的待處理作業總數。只有第一個組別 (組別 ID「0」) 需要提供待處理作業總數。
Apache Beam 存放區包含實作 UnboundedReader
類別自訂來源的多個範例。
Python
有界來源
- 在
BoundedSource
子類別中,實作estimate_size
方法。Dataflow 服務會使用estimate_size
計算一開始要提供給您管道的工作站數量。 - 在
RangeTracker
子類別中,實作fraction_consumed
方法。Dataflow 服務會使用fraction_consumed
追蹤讀取進度,並彙整出讀取期間應使用的正確工作站數量。
Go
有界來源
- 在
RangeTracker
中實作GetProgress()
方法。Dataflow 服務會使用GetProgress
追蹤讀取進度,並彙整出讀取期間應使用的正確工作站數量。
限制
- 在執行 Dataflow Prime 的工作中,水平自動調度資源會在垂直自動調度資源期間停用,並在垂直自動調度資源結束後最多停用 10 分鐘。詳情請參閱「對自動水平調度資源的影響」。
- 如果管道未使用 Dataflow Shuffle,Dataflow 可能無法有效縮減工作站,因為工作站可能已將重組資料儲存在本機磁碟中。
- PeriodicImpulse如果管道使用舊版 SDK 中的
PeriodicImpulse
,Dataflow 工作站不會如預期縮減。