調整串流管道的自動水平調度資源

在輸入資料量大的串流管道中,通常需要權衡成本和延遲時間。為維持低延遲,Dataflow 必須在流量增加時新增工作站。另一個因素是管道應以多快的速度擴大或縮減規模,以因應輸入資料速率的變化。

Dataflow 自動調度程式的預設設定適用於許多工作負載。不過,您可能會想針對特定情境調整這項行為。舉例來說,為了降低成本,您或許可以接受較高的平均延遲時間,或是希望 Dataflow 在流量暴增時更快擴充。

如要最佳化水平自動調度功能,可以調整下列參數:

設定自動調度資源範圍

建立新的串流工作時,您可以設定工作站的初始數量和工作站數量上限。如要這麼做,請指定下列管道選項

Java

  • --numWorkers:管道開始執行時可用的初始工作站數量
  • --maxNumWorkers:管道可用的工作站數量上限

Python

  • --num_workers:管道開始執行時可用的初始工作站數量
  • --max_num_workers:管道可用的工作站數量上限

Go

  • --num_workers:管道開始執行時可用的初始工作站數量
  • --max_num_workers:管道可用的工作站數量上限

如果串流工作使用 Streaming Engine,則 --maxNumWorkers 旗標為選用。預設值為 100。如果串流工作未使用 Streaming Engine,啟用自動水平調度資源功能時,就必須提供 --maxNumWorkers

--maxNumWorkers 的起始值也會決定要為工作分配多少永久磁碟。系統會以固定資源池的永久磁碟來部署管道,數量相當於 --maxNumWorkers。串流期間,系統會重新分配永久磁碟,使每個工作站都有相同數量的連接磁碟。

如果您設定 --maxNumWorkers,請確保該值提供的磁碟數量足夠管道使用。設定初始值時,請考量日後的成長。如要瞭解永久磁碟效能,請參閱「設定永久磁碟和 VM」。Dataflow 會針對 Persistent Disk 用量計費,並有Compute Engine 配額,包括 Persistent Disk 配額。

根據預設,使用 Streaming Engine 的串流工作工作站數量下限為 1,不使用 Streaming Engine 的工作則為 (maxNumWorkers/15),並向上取整。

更新自動調度資源範圍

對於使用 Streaming Engine 的工作,您可以調整工作站數量下限和上限,不必停止或取代工作。如要調整這些值,請使用執行中的工作更新。更新下列工作選項:

  • --min-num-workers:最少工作人員人數。
  • --max-num-workers:工作人員人數上限。

gcloud

使用 gcloud dataflow jobs update-options 指令:

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  JOB_ID

更改下列內容:

  • REGION:工作地區端點的地區 ID
  • MINIMUM_WORKERS:Compute Engine 執行個體的最低數量
  • MAXIMUM_WORKERS:Compute Engine 執行個體的數量上限
  • JOB_ID:要更新的工作 ID

你也可以個別更新 --min-num-workers--max-num-workers

REST

請使用 projects.locations.jobs.update 方法:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS
  }
}

更改下列內容:

  • PROJECT_ID:Dataflow 工作的 Google Cloud 專案 ID
  • REGION:工作地區端點的地區 ID
  • JOB_ID:要更新的工作 ID
  • MINIMUM_WORKERS:Compute Engine 執行個體的最低數量
  • MAXIMUM_WORKERS:Compute Engine 執行個體的數量上限

你也可以個別更新 min_num_workersmax_num_workers。 在 updateMask 查詢參數中指定要更新的參數,並在要求主體的 runtimeUpdatableParams 欄位中加入更新的值。以下範例會更新 min_num_workers

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

對於未使用 Streaming Engine 的工作,您可以取代現有工作,並更新 maxNumWorkers 的值。

如果您更新未使用 Streaming Engine 的串流工作,更新後的工作預設會停用水平自動調度資源。如要保持啟用自動調度資源功能,請為更新後的工作指定 --autoscalingAlgorithm--maxNumWorkers

設定工作站使用率提示

Dataflow 會使用平均 CPU 使用率做為信號,判斷何時套用水平自動調度資源功能。根據預設,Dataflow 會將目標 CPU 使用率設為 0.8。如果使用率超出這個範圍,Dataflow 可能會新增或移除工作站。

如要進一步控管自動調度資源行為,您可以將目標 CPU 使用率設為 [0.1, 0.9] 範圍內的值。

  • 如要降低尖峰延遲時間,請設定較低的 CPU 使用率值。值越小,Dataflow 就越能積極地因應工作人員使用率的成長而擴充,並更保守地縮減規模,以提升穩定性。如果管道以穩定狀態執行,較低的值也能提供更多空間,通常會導致尾端延遲時間較短。(尾部延遲時間是指處理新記錄前最長的等待時間)。

  • 如果想在流量暴增時節省資源並降低費用,請設定較高的值。值越高,越能避免過度放大,但延遲時間會較長。

如要在執行非範本工作時設定使用率提示,請設定worker_utilization_hint 服務選項。如果是範本工作,請更新使用率提示,因為系統不支援服務選項。

以下範例說明如何使用 worker_utilization_hint

Java

--dataflowServiceOptions=worker_utilization_hint=TARGET_UTILIZATION

TARGET_UTILIZATION 替換為 [0.1, 0.9] 範圍內的值。

Python

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

TARGET_UTILIZATION 替換為 [0.1, 0.9] 範圍內的值。

Go

--dataflow_service_options=worker_utilization_hint=TARGET_UTILIZATION

TARGET_UTILIZATION 替換為 [0.1, 0.9] 範圍內的值。

對於新管道,建議您使用預設設定,在實際負載下進行測試。然後評估自動調度資源行為是否適用於您的管道,並視需要進行調整。

使用率提示只是 Dataflow 決定是否要調整工作站規模時的其中一個因素。其他因素 (例如待處理項目和可用鍵) 可能會覆寫提示值。此外,提示並非嚴格的目標。自動調度器會盡量將 CPU 使用率維持在提示值範圍內,但匯總使用率指標可能會高於或低於提示值。詳情請參閱「串流自動調度資源啟發式方法」。

更新使用率提示

如要在作業執行期間更新使用率提示,請按照下列方式執行執行中更新

gcloud

使用 gcloud dataflow jobs update-options 指令:

gcloud dataflow jobs update-options \
  --region=REGION \
  --worker-utilization-hint=TARGET_UTILIZATION \
  JOB_ID

更改下列內容:

  • REGION:工作地區端點的地區 ID
  • JOB_ID:要更新的工作 ID
  • TARGET_UTILIZATION:範圍介於 [0.1, 0.9] 的值

如要將使用率提示重設為預設值,請使用下列 gcloud 指令:

gcloud dataflow jobs update-options \
  --unset-worker-utilization-hint \
  --region=REGION \
  --project=PROJECT_ID \
  JOB_ID

REST

請使用 projects.locations.jobs.update 方法:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.worker_utilization_hint
{
  "runtime_updatable_params": {
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

更改下列內容:

  • PROJECT_ID:Dataflow 工作的 Google Cloud 專案 ID。
  • REGION:工作地區端點的地區 ID。
  • JOB_ID:要更新的工作 ID。
  • TARGET_UTILIZATION:範圍介於 [0.1, 0.9] 的值

設定工作站平行處理提示

如要處理自動調度資源,且作業時間較長,較不依賴 CPU,例如機器學習密集型工作負載,您可以使用 Apache Beam 資源提示設定工作站平行處理提示。這些提示會將自動調度資源切換至其他模式,以便針對需要大量 GPU 資源的工作負載或處理時間較長的轉換進行最佳化。

以下範例說明如何將平行處理提示附加至轉換:

Java

pcoll.apply(MyCompositeTransform.of(...)
  .setResourceHints(
      ResourceHints.create()
          .withMaxActiveBundlesPerWorker(TARGET_PARALLELISM_PER_WORKER)))

TARGET_PARALLELISM_PER_WORKER 替換為適合您用途的值。如需一般指引,請參閱如何選擇合適的起始值。

Python

pcoll | MyPTransform().with_resource_hints(
  max_active_bundles_per_worker=TARGET_PARALLELISM_PER_WORKER)

TARGET_PARALLELISM_PER_WORKER 替換為適合您用途的值。如需一般指引,請參閱如何選擇合適的起始值。

選擇工作站平行處理提示值

如果是機器學習用途,建議從每個工作站中平行執行的模型數量開始。這個值會受到工作站加速器容量和模型大小的限制。

如果是其他用途,管道會受到記憶體或 CPU 限制。如果是受記憶體限制的管道,請使用記憶體限制計算最大平行處理量。如果是受 CPU 限制的管道,建議保留預設的自動調整規模政策,不要提供平行處理提示。

您可以微調值,以配合其他階段的處理需求,例如寫入接收器。當模型平行處理為 2 時,將值增加 1 或 2 有助於辨識寫入接收器的處理時間較快,因為這樣可提供更多彈性,以因應其他階段的處理作業。如果管道不涉及隨機播放,且轉換作業會合併為單一階段,則不需要調整其他轉換作業的值。

您也可以調整這個值,模擬可接受的待處理事項延遲效果。舉例來說,如果您可接受最多 10 分鐘的延遲,且模型的平均處理時間為 1 分鐘,則假設工作站數量上限設為 10,您可以選擇將值增加 1。

GPU 密集型自動調度啟發式演算法

在透過設定平行處理提示指出 GPU 密集型設定時,Dataflow 會在自動調整規模時考量多項因素。 包括:

  • 可用金鑰。索引鍵是 Dataflow 平行處理的基本單位。
  • 每個工作站可執行的套件數量上限。這表示工作站內處理平行作業的理想數量上限。

調度決策背後的概念是計算處理目前負載 (由可用鍵指出) 所需的工作站數量。舉例來說,如果可處理的金鑰有 100 個,且每個工作站的平行處理量上限為 10,則總共應有 10 個工作站。

如果管道複雜,且有大量需要大量 GPU 運算資源的工作,以及需要大量 CPU 運算資源的轉換,建議啟用適當調整。這樣一來,服務就能清楚區分 CPU 密集型和 GPU 密集型工作,並據此調整每個工作站集區的大小。

串流自動調度資源啟發式演算法

對於串流管道,自動水平調度資源的目標是盡量減少待處理作業數量,同時盡量提高工作站的使用率及總處理量,並且快速因應負載暴增的情況。

Dataflow 會考量多項因素來自動調度資源,包括:

  • 待處理。預估待處理時間是根據輸入來源的待處理位元組和輸送量計算得出。如果預估待處理時間超過 15 秒,管道就會視為待處理工作量過多。

  • 目標 CPU 使用率。平均 CPU 使用率的預設目標為 0.8。您可以覆寫這個值

  • 可用金鑰。索引鍵是 Dataflow 平行處理的基本單位。

在某些情況下,Dataflow 會根據下列因素做出自動調整規模決策。如果作業使用這些因素,您可以在「Autoscaling」(自動調度資源) 指標分頁中查看相關資訊

  • 以索引鍵為準的節流會根據工作收到的處理索引鍵數量,計算使用者工作站的上限,因為每個索引鍵一次只能由一個工作站處理。

  • 縮減抑制。如果 Dataflow 偵測到不穩定的自動調度資源決策,就會減緩縮減速度,以提升穩定性。

  • 以 CPU 為準的升頻會以高 CPU 使用率做為升頻條件。

  • 對於不使用 Streaming Engine 的串流工作,擴充作業可能會受到永久磁碟數量的限制。詳情請參閱「設定自動調度資源範圍」。

  • GPU 密集型自動調度資源 (如果設定工作站平行處理提示)。詳情請參閱「GPU 密集型自動調度啟發式演算法」。

升級。如果串流管道積壓的工作量持續數分鐘,且工作站的平行處理程度充足,Dataflow 就會擴大資源規模。假設每個工作站的目前總處理量不變,Dataflow 會嘗試在擴充後約 150 秒內清除待處理工作量。如果待處理工作數量過多,但工作站的平行處理量不足以支援更多工作站,管道就不會擴充。(如果平行處理可用的金鑰數量不足,即使增加工作站數量,也無法加快處理待處理工作的速度)。

縮減規模:自動調度器決定縮減規模時,積壓工作是優先考量的因素。自動配置器會將待處理工作量控制在 15 秒內。如果待處理工作數量降至 10 秒以下,且工作站平均使用率低於 CPU 使用率目標,Dataflow 就會縮減規模。只要待處理工作數量可接受,自動配置器就會盡量將 CPU 使用率維持在目標 CPU 使用率附近。不過,如果使用率已相當接近目標,自動調度器可能會維持工作站數量不變,因為每次縮減都會產生費用。

Streaming Engine 也會根據計時器積壓工作,採用預測式自動調度資源技術。串流管道中的無界限資料會依時間戳記分組,劃分為時間區間。時間區間結束時,系統會針對該時間區間中處理的每個鍵觸發計時器。計時器觸發表示特定金鑰的時間區間已過期。串流引擎可以測量計時器積壓工作,並預測視窗結尾會觸發多少計時器。Dataflow 可以使用計時器待處理工作做為信號,估算未來計時器觸發時必須進行的處理量。根據預估的未來負載,Dataflow 會預先自動調度資源,以滿足預期需求。

指標

如要查看目前的工作自動調度資源限制,請查詢下列指標:

  • job/max_worker_instances_limit:工作人員人數上限。
  • job/min_worker_instances_limit:最少工作人員人數。

如要取得工作站利用率的相關資訊,請查詢下列指標:

  • job/aggregated_worker_utilization:匯總的工作人員使用率。
  • job/worker_utilization_hint:目前的工作人員使用率提示。

如要深入瞭解自動調度資源的行為,請查詢下列指標:

  • job.worker_utilization_hint_is_actively_used:指出自動配置器是否積極採用工作人員使用率提示。如果其他因素在取樣這項指標時覆寫提示,值會是 false
  • job/horizontal_worker_scaling:說明自動調度資源器採取的決策。這項指標包含下列標籤:
    • direction:指定自動調度器是否擴充、縮減或未採取任何動作。
    • rationale:指定自動調度資源決策的理由。

詳情請參閱「Cloud Monitoring 指標」。這些指標也會顯示在自動調整規模監控圖表中。

後續步驟