在輸入資料量大的串流管道中,通常需要權衡成本和延遲時間。為維持低延遲,Dataflow 必須在流量增加時新增工作站。另一個因素是管道應以多快的速度擴大或縮減規模,以因應輸入資料速率的變化。
Dataflow 自動調度程式的預設設定適用於許多工作負載。不過,您可能會想針對特定情境調整這項行為。舉例來說,為了降低成本,您或許可以接受較高的平均延遲時間,或是希望 Dataflow 在流量暴增時更快擴充。
如要最佳化水平自動調度功能,可以調整下列參數:
- 自動調度資源範圍:要分配的工作站數量下限與上限。
- 工作站使用率提示:工作站的目標 CPU 使用率。
- 工作站平行處理量提示:工作站的目標平行處理量。
設定自動調度資源範圍
建立新的串流工作時,您可以設定工作站的初始數量和工作站數量上限。如要這麼做,請指定下列管道選項:
Java
--numWorkers
:管道開始執行時可用的初始工作站數量--maxNumWorkers
:管道可用的工作站數量上限
Python
--num_workers
:管道開始執行時可用的初始工作站數量--max_num_workers
:管道可用的工作站數量上限
Go
--num_workers
:管道開始執行時可用的初始工作站數量--max_num_workers
:管道可用的工作站數量上限
如果串流工作使用 Streaming Engine,則 --maxNumWorkers
旗標為選用。預設值為 100
。如果串流工作未使用 Streaming Engine,啟用自動水平調度資源功能時,就必須提供 --maxNumWorkers
。
--maxNumWorkers
的起始值也會決定要為工作分配多少永久磁碟。系統會以固定資源池的永久磁碟來部署管道,數量相當於 --maxNumWorkers
。串流期間,系統會重新分配永久磁碟,使每個工作站都有相同數量的連接磁碟。
如果您設定 --maxNumWorkers
,請確保該值提供的磁碟數量足夠管道使用。設定初始值時,請考量日後的成長。如要瞭解永久磁碟效能,請參閱「設定永久磁碟和 VM」。Dataflow 會針對 Persistent Disk 用量計費,並有Compute Engine 配額,包括 Persistent Disk 配額。
根據預設,使用 Streaming Engine 的串流工作工作站數量下限為 1,不使用 Streaming Engine 的工作則為 (maxNumWorkers
/15),並向上取整。
更新自動調度資源範圍
對於使用 Streaming Engine 的工作,您可以調整工作站數量下限和上限,不必停止或取代工作。如要調整這些值,請使用執行中的工作更新。更新下列工作選項:
--min-num-workers
:最少工作人員人數。--max-num-workers
:工作人員人數上限。
gcloud
使用 gcloud dataflow jobs update-options
指令:
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ JOB_ID
更改下列內容:
- REGION:工作地區端點的地區 ID
- MINIMUM_WORKERS:Compute Engine 執行個體的最低數量
- MAXIMUM_WORKERS:Compute Engine 執行個體的數量上限
- JOB_ID:要更新的工作 ID
你也可以個別更新 --min-num-workers
和 --max-num-workers
。
REST
請使用 projects.locations.jobs.update
方法:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS } }
更改下列內容:
- PROJECT_ID:Dataflow 工作的 Google Cloud 專案 ID
- REGION:工作地區端點的地區 ID
- JOB_ID:要更新的工作 ID
- MINIMUM_WORKERS:Compute Engine 執行個體的最低數量
- MAXIMUM_WORKERS:Compute Engine 執行個體的數量上限
你也可以個別更新 min_num_workers
和 max_num_workers
。
在 updateMask
查詢參數中指定要更新的參數,並在要求主體的 runtimeUpdatableParams
欄位中加入更新的值。以下範例會更新 min_num_workers
:
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
對於未使用 Streaming Engine 的工作,您可以取代現有工作,並更新 maxNumWorkers
的值。
如果您更新未使用 Streaming Engine 的串流工作,更新後的工作預設會停用水平自動調度資源。如要保持啟用自動調度資源功能,請為更新後的工作指定 --autoscalingAlgorithm
和 --maxNumWorkers
。
設定工作站使用率提示
Dataflow 會使用平均 CPU 使用率做為信號,判斷何時套用水平自動調度資源功能。根據預設,Dataflow 會將目標 CPU 使用率設為 0.8。如果使用率超出這個範圍,Dataflow 可能會新增或移除工作站。
如要進一步控管自動調度資源行為,您可以將目標 CPU 使用率設為 [0.1, 0.9] 範圍內的值。
如要降低尖峰延遲時間,請設定較低的 CPU 使用率值。值越小,Dataflow 就越能積極地因應工作人員使用率的成長而擴充,並更保守地縮減規模,以提升穩定性。如果管道以穩定狀態執行,較低的值也能提供更多空間,通常會導致尾端延遲時間較短。(尾部延遲時間是指處理新記錄前最長的等待時間)。
如果想在流量暴增時節省資源並降低費用,請設定較高的值。值越高,越能避免過度放大,但延遲時間會較長。
如要在執行非範本工作時設定使用率提示,請設定worker_utilization_hint
服務選項。如果是範本工作,請更新使用率提示,因為系統不支援服務選項。
以下範例說明如何使用 worker_utilization_hint
:
Java
--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION
將 TARGET_UTILIZATION 替換為 [0.1, 0.9] 範圍內的值。
Python
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
將 TARGET_UTILIZATION 替換為 [0.1, 0.9] 範圍內的值。
Go
--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION
將 TARGET_UTILIZATION 替換為 [0.1, 0.9] 範圍內的值。
對於新管道,建議您使用預設設定,在實際負載下進行測試。然後評估自動調度資源行為是否適用於您的管道,並視需要進行調整。
使用率提示只是 Dataflow 決定是否要調整工作站規模時的其中一個因素。其他因素 (例如待處理項目和可用鍵) 可能會覆寫提示值。此外,提示並非嚴格的目標。自動調度器會盡量將 CPU 使用率維持在提示值範圍內,但匯總使用率指標可能會高於或低於提示值。詳情請參閱「串流自動調度資源啟發式方法」。
更新使用率提示
如要在作業執行期間更新使用率提示,請按照下列方式執行執行中更新:
gcloud
使用 gcloud dataflow jobs update-options
指令:
gcloud dataflow jobs update-options \ --region=REGION \ --worker-utilization-hint=TARGET_UTILIZATION \ JOB_ID
更改下列內容:
- REGION:工作地區端點的地區 ID
- JOB_ID:要更新的工作 ID
- TARGET_UTILIZATION:範圍介於 [0.1, 0.9] 的值
如要將使用率提示重設為預設值,請使用下列 gcloud 指令:
gcloud dataflow jobs update-options \ --unset-worker-utilization-hint \ --region=REGION \ --project=PROJECT_ID \ JOB_ID
REST
請使用 projects.locations.jobs.update
方法:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint { "runtime_updatable_params": { "worker_utilization_hint": TARGET_UTILIZATION } }
更改下列內容:
- PROJECT_ID:Dataflow 工作的 Google Cloud 專案 ID。
- REGION:工作地區端點的地區 ID。
- JOB_ID:要更新的工作 ID。
- TARGET_UTILIZATION:範圍介於 [0.1, 0.9] 的值
設定工作站平行處理提示
如要處理自動調度資源,且作業時間較長,較不依賴 CPU,例如機器學習密集型工作負載,您可以使用 Apache Beam 資源提示設定工作站平行處理提示。這些提示會將自動調度資源切換至其他模式,以便針對需要大量 GPU 資源的工作負載或處理時間較長的轉換進行最佳化。
以下範例說明如何將平行處理提示附加至轉換:
Java
pcoll.apply(MyCompositeTransform.of(...)
.setResourceHints(
ResourceHints.create()
.withMaxActiveBundlesPerWorker(TARGET_PARALLELISM_PER_WORKER)))
將 TARGET_PARALLELISM_PER_WORKER 替換為適合您用途的值。如需一般指引,請參閱如何選擇合適的起始值。
Python
pcoll | MyPTransform().with_resource_hints(
max_active_bundles_per_worker=TARGET_PARALLELISM_PER_WORKER)
將 TARGET_PARALLELISM_PER_WORKER 替換為適合您用途的值。如需一般指引,請參閱如何選擇合適的起始值。
選擇工作站平行處理提示值
如果是機器學習用途,建議從每個工作站中平行執行的模型數量開始。這個值會受到工作站加速器容量和模型大小的限制。
如果是其他用途,管道會受到記憶體或 CPU 限制。如果是受記憶體限制的管道,請使用記憶體限制計算最大平行處理量。如果是受 CPU 限制的管道,建議保留預設的自動調整規模政策,不要提供平行處理提示。
您可以微調值,以配合其他階段的處理需求,例如寫入接收器。當模型平行處理為 2 時,將值增加 1 或 2 有助於辨識寫入接收器的處理時間較快,因為這樣可提供更多彈性,以因應其他階段的處理作業。如果管道不涉及隨機播放,且轉換作業會合併為單一階段,則不需要調整其他轉換作業的值。
您也可以調整這個值,模擬可接受的待處理事項延遲效果。舉例來說,如果您可接受最多 10 分鐘的延遲,且模型的平均處理時間為 1 分鐘,則假設工作站數量上限設為 10,您可以選擇將值增加 1。
GPU 密集型自動調度啟發式演算法
在透過設定平行處理提示指出 GPU 密集型設定時,Dataflow 會在自動調整規模時考量多項因素。 包括:
- 可用金鑰。索引鍵是 Dataflow 平行處理的基本單位。
- 每個工作站可執行的套件數量上限。這表示工作站內處理平行作業的理想數量上限。
調度決策背後的概念是計算處理目前負載 (由可用鍵指出) 所需的工作站數量。舉例來說,如果可處理的金鑰有 100 個,且每個工作站的平行處理量上限為 10,則總共應有 10 個工作站。
如果管道複雜,且有大量需要大量 GPU 運算資源的工作,以及需要大量 CPU 運算資源的轉換,建議啟用適當調整。這樣一來,服務就能清楚區分 CPU 密集型和 GPU 密集型工作,並據此調整每個工作站集區的大小。
串流自動調度資源啟發式演算法
對於串流管道,自動水平調度資源的目標是盡量減少待處理作業數量,同時盡量提高工作站的使用率及總處理量,並且快速因應負載暴增的情況。
Dataflow 會考量多項因素來自動調度資源,包括:
待處理。預估待處理時間是根據輸入來源的待處理位元組和輸送量計算得出。如果預估待處理時間超過 15 秒,管道就會視為待處理工作量過多。
目標 CPU 使用率。平均 CPU 使用率的預設目標為 0.8。您可以覆寫這個值。
可用金鑰。索引鍵是 Dataflow 平行處理的基本單位。
在某些情況下,Dataflow 會根據下列因素做出自動調整規模決策。如果作業使用這些因素,您可以在「Autoscaling」(自動調度資源) 指標分頁中查看相關資訊。
以索引鍵為準的節流會根據工作收到的處理索引鍵數量,計算使用者工作站的上限,因為每個索引鍵一次只能由一個工作站處理。
縮減抑制。如果 Dataflow 偵測到不穩定的自動調度資源決策,就會減緩縮減速度,以提升穩定性。
以 CPU 為準的升頻會以高 CPU 使用率做為升頻條件。
對於不使用 Streaming Engine 的串流工作,擴充作業可能會受到永久磁碟數量的限制。詳情請參閱「設定自動調度資源範圍」。
GPU 密集型自動調度資源 (如果設定工作站平行處理提示)。詳情請參閱「GPU 密集型自動調度啟發式演算法」。
升級。如果串流管道積壓的工作量持續數分鐘,且工作站的平行處理程度充足,Dataflow 就會擴大資源規模。假設每個工作站的目前總處理量不變,Dataflow 會嘗試在擴充後約 150 秒內清除待處理工作量。如果待處理工作數量過多,但工作站的平行處理量不足以支援更多工作站,管道就不會擴充。(如果平行處理可用的金鑰數量不足,即使增加工作站數量,也無法加快處理待處理工作的速度)。
縮減規模:自動調度器決定縮減規模時,積壓工作是優先考量的因素。自動配置器會將待處理工作量控制在 15 秒內。如果待處理工作數量降至 10 秒以下,且工作站平均使用率低於 CPU 使用率目標,Dataflow 就會縮減規模。只要待處理工作數量可接受,自動配置器就會盡量將 CPU 使用率維持在目標 CPU 使用率附近。不過,如果使用率已相當接近目標,自動調度器可能會維持工作站數量不變,因為每次縮減都會產生費用。
Streaming Engine 也會根據計時器積壓工作,採用預測式自動調度資源技術。串流管道中的無界限資料會依時間戳記分組,劃分為時間區間。時間區間結束時,系統會針對該時間區間中處理的每個鍵觸發計時器。計時器觸發表示特定金鑰的時間區間已過期。串流引擎可以測量計時器積壓工作,並預測視窗結尾會觸發多少計時器。Dataflow 可以使用計時器待處理工作做為信號,估算未來計時器觸發時必須進行的處理量。根據預估的未來負載,Dataflow 會預先自動調度資源,以滿足預期需求。
指標
如要查看目前的工作自動調度資源限制,請查詢下列指標:
job/max_worker_instances_limit
:工作人員人數上限。job/min_worker_instances_limit
:最少工作人員人數。
如要取得工作站利用率的相關資訊,請查詢下列指標:
job/aggregated_worker_utilization
:匯總的工作人員使用率。job/worker_utilization_hint
:目前的工作人員使用率提示。
如要深入瞭解自動調度資源的行為,請查詢下列指標:
job.worker_utilization_hint_is_actively_used
:指出自動配置器是否積極採用工作人員使用率提示。如果其他因素在取樣這項指標時覆寫提示,值會是false
。job/horizontal_worker_scaling
:說明自動調度資源器採取的決策。這項指標包含下列標籤:direction
:指定自動調度器是否擴充、縮減或未採取任何動作。rationale
:指定自動調度資源決策的理由。
詳情請參閱「Cloud Monitoring 指標」。這些指標也會顯示在自動調整規模監控圖表中。