更新現有管道

本文說明如何更新進行中的串流工作。您可能會因為以下原因而希望更新現有 Dataflow 工作:

  • 您想增強或改善管道程式碼。
  • 您想修正管道程式碼中的錯誤。
  • 您想更新管道,以因應資料格式的異動,或資料來源中的版本或其他變更。
  • 您想為所有 Dataflow 工作人員修補與 Container-Optimized OS 相關的安全漏洞。
  • 您想調度串流 Apache Beam 管道的資源,以使用不同數量的工作站。

更新工作的方式有兩種:

  • 執行中工作更新:對於使用 Streaming Engine 的串流工作,您可以更新 min-num-workersmax-num-workers 工作選項,不必停止工作或變更工作 ID。
  • 替換工作:如要執行更新後的管道程式碼,或更新進行中工作更新作業不支援的工作選項,請啟動新工作來取代現有工作。如要確認取代工作是否有效,請先驗證新工作的工作圖,再啟動新工作。

更新工作時,Dataflow 服務會在目前執行中的工作與可能的替換工作之間執行相容性檢查,這項相容性檢查可確保將中繼狀態資訊和緩衝資料等項目,從先前的工作轉移至替換的工作。

您也可以使用 Apache Beam SDK 的內建記錄基礎架構,在更新工作時記錄資訊。詳情請參閱「處理管道記錄」。如要找出管道程式碼的問題,請使用DEBUG記錄層級

更新執行中的工作選項

如果串流工作使用 Streaming Engine,您可以更新下列工作選項,不必停止工作或變更工作 ID:

  • min-num-workers:Compute Engine 執行個體的最低數量。
  • max-num-workers:Compute Engine 執行個體的數量上限。
  • worker-utilization-hint目標 CPU 使用率,範圍為 [0.1, 0.9]

如要更新其他工作,您必須取代目前的工作。 詳情請參閱「啟動替代工作」。

執行飛行中更新

如要更新執行中的工作選項,請按照下列步驟操作。

gcloud

使用 gcloud dataflow jobs update-options 指令:

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  --worker-utilization-hint=TARGET_UTILIZATION \
  JOB_ID

更改下列內容:

  • REGION:工作區域的 ID
  • MINIMUM_WORKERS:Compute Engine 執行個體的最低數量
  • MAXIMUM_WORKERS:Compute Engine 執行個體的數量上限
  • TARGET_UTILIZATION:範圍介於 [0.1, 0.9] 的值
  • JOB_ID:要更新的工作 ID

您也可以個別更新 --min-num-workers--max-num-workersworker-utilization-hint

REST

請使用 projects.locations.jobs.update 方法:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=MASK
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS,
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

更改下列內容:

  • MASK:以逗號分隔的參數清單,列出要更新的參數,可從下列項目中選擇:
    • runtime_updatable_params.max_num_workers
    • runtime_updatable_params.min_num_workers
    • runtime_updatable_params.worker_utilization_hint
  • PROJECT_ID:Dataflow 工作的 Google Cloud 專案 ID
  • REGION:工作區域的 ID
  • JOB_ID:要更新的工作 ID
  • MINIMUM_WORKERS:Compute Engine 執行個體的最低數量
  • MAXIMUM_WORKERS:Compute Engine 執行個體的數量上限
  • TARGET_UTILIZATION:範圍介於 [0.1, 0.9] 的值

你也可以個別更新 min_num_workersmax_num_workersworker_utilization_hint。在 updateMask 查詢參數中指定要更新的參數,並在要求主體的 runtimeUpdatableParams 欄位中加入更新的值。以下範例會更新 min_num_workers

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

工作必須處於執行中狀態,才能接收更新。如果工作尚未開始或已取消,就會發生錯誤。同樣地,如果您啟動替代工作,請等待該工作開始執行,再將任何進行中的更新傳送至新工作。

提交更新要求後,建議等待要求完成再傳送其他更新。查看工作記錄,瞭解要求何時完成。

驗證替換工作

如要驗證替代工作是否有效,請先驗證新工作的工作圖,再啟動新工作。在 Dataflow 中,工作圖是管道的圖形表示法。驗證工作圖表可降低更新後管道發生錯誤或管道失敗的風險。此外,您不需要停止原始工作即可驗證更新,因此工作不會發生任何停機時間。

如要驗證工作圖,請按照步驟啟動替換工作。在更新指令中加入 graph_validate_only Dataflow 服務選項

Java

  • 傳送 --update 選項。
  • PipelineOptions 中的 --jobName 選項設為您要更新的工作名稱。
  • --region 選項設為與要更新的工作相同的地區。
  • 加入 --dataflowServiceOptions=graph_validate_only 服務選項。
  • 如果管道中有任何轉換名稱有所變更,您必須提供轉換對應,並使用 --transformNameMapping 選項加以傳送。
  • 如果您提交的替代工作使用較新版本的 Apache Beam SDK,請將 --updateCompatibilityVersion 設為原始工作使用的 Apache Beam SDK 版本。

Python

  • 傳送 --update 選項。
  • PipelineOptions 中的 --job_name 選項設為您要更新的工作名稱。
  • --region 選項設為與要更新的工作相同的地區。
  • 加入 --dataflow_service_options=graph_validate_only 服務選項。
  • 如果管道中有任何轉換名稱有所變更,您必須提供轉換對應,並使用 --transform_name_mapping 選項加以傳送。
  • 如果您提交的替代工作使用較新版本的 Apache Beam SDK,請將 --updateCompatibilityVersion 設為原始工作使用的 Apache Beam SDK 版本。

Go

  • 傳送 --update 選項。
  • --job_name 選項設為您要更新的工作名稱。
  • --region 選項設為與要更新的工作相同的地區。
  • 加入 --dataflow_service_options=graph_validate_only 服務選項。
  • 如果管道中有任何轉換名稱有所變更,您必須提供轉換對應,並使用 --transform_name_mapping 選項加以傳送。

gcloud

如要驗證 Flex 範本工作的作業圖,請使用 gcloud dataflow flex-template run 指令搭配 additional-experiments 選項:

  • 傳送 --update 選項。
  • JOB_NAME 設為您要更新的工作名稱。
  • --region 選項設為與要更新的工作相同的地區。
  • 加入 --additional-experiments=graph_validate_only 選項。
  • 如果管道中有任何轉換名稱有所變更,您必須提供轉換對應,並使用 --transform-name-mappings 選項加以傳送。

例如:

gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only

JOB_NAME 替換為要更新的工作名稱。

REST

FlexTemplateRuntimeEnvironment (Flex 範本) 或 RuntimeEnvironment 物件中使用 additionalExperiments 欄位。

{
  additionalExperiments : ["graph_validate_only"]
  ...
}

graph_validate_only 服務選項只會驗證管道更新。建立或啟動管道時,請勿使用這個選項。如要更新管道,請啟動替換工作,但不要使用 graph_validate_only 服務選項。

工作圖表驗證成功後,工作狀態和工作記錄會顯示下列狀態:

  • 工作狀態JOB_STATE_DONE
  • 在 Google Cloud 控制台中,「Job status」(工作狀態)Succeeded
  • 作業記錄中會顯示以下訊息:

    Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
    

如果工作圖驗證失敗,工作狀態和工作記錄檔會顯示下列狀態:

  • 工作狀態JOB_STATE_FAILED
  • 在 Google Cloud 控制台中,「Job status」(工作狀態)Failed
  • 工作記錄中會顯示訊息,說明不相容錯誤。訊息內容取決於錯誤。

啟動替換工作

您可能基於下列原因取代現有工作:

  • 執行更新後的管道程式碼。
  • 如要更新不支援即時更新的工作選項。

如要確認替代工作是否有效,請先驗證新工作的工作圖,再啟動新工作。

啟動替換工作時,除了設定工作的標準選項以外,您還必須設定下列管道選項以執行更新程序:

Java

  • 傳送 --update 選項。
  • PipelineOptions 中的 --jobName 選項設為您要更新的工作名稱。
  • --region 選項設為與要更新的工作相同的地區。
  • 如果管道中有任何轉換名稱有所變更,您必須提供轉換對應,並使用 --transformNameMapping 選項加以傳送。
  • 如果您提交的替代工作使用較新版本的 Apache Beam SDK,請將 --updateCompatibilityVersion 設為原始工作使用的 Apache Beam SDK 版本。

Python

  • 傳送 --update 選項。
  • PipelineOptions 中的 --job_name 選項設為您要更新的工作名稱。
  • --region 選項設為與要更新的工作相同的地區。
  • 如果管道中有任何轉換名稱有所變更,您必須提供轉換對應,並使用 --transform_name_mapping 選項加以傳送。
  • 如果您提交的替代工作使用較新版本的 Apache Beam SDK,請將 --updateCompatibilityVersion 設為原始工作使用的 Apache Beam SDK 版本。

Go

  • 傳送 --update 選項。
  • --job_name 選項設為您要更新的工作名稱。
  • --region 選項設為與要更新的工作相同的地區。
  • 如果管道中有任何轉換名稱有所變更,您必須提供轉換對應,並使用 --transform_name_mapping 選項加以傳送。

gcloud

如要使用 gcloud CLI 更新彈性範本作業,請使用 gcloud dataflow flex-template run 指令。系統不支援使用 gcloud CLI 更新其他工作。

  • 傳送 --update 選項。
  • JOB_NAME 設為您要更新的工作名稱。
  • --region 選項設為與要更新的工作相同的地區。
  • 如果管道中有任何轉換名稱有所變更,您必須提供轉換對應,並使用 --transform-name-mappings 選項加以傳送。

REST

這些操作說明會示範如何使用 REST API 更新非範本工作。如要使用 REST API 更新傳統範本工作,請參閱「更新自訂範本串流工作」。如要使用 REST API 更新 Flex 範本工作,請參閱「更新 Flex 範本工作」。

  1. 使用 projects.locations.jobs.get 方法,擷取要更換工作的 job 資源。加入 view 查詢參數,並將值設為 JOB_VIEW_DESCRIPTION。加入 JOB_VIEW_DESCRIPTION 可限制回應中的資料量,確保後續要求不會超過大小限制。如需更詳細的工作資訊,請使用 JOB_VIEW_ALL 值。

    GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
    

    替換下列值:

    • PROJECT_ID:Dataflow 工作的 Google Cloud 專案 ID
    • REGION:要更新的工作區域
    • JOB_ID:要更新的工作 ID
  2. 如要更新工作,請使用 projects.locations.jobs.create 方法。在要求主體中,使用您擷取的 job 資源。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    {
      "id": JOB_ID,
      "replaceJobId": JOB_ID,
      "name": JOB_NAME,
      "type": "JOB_TYPE_STREAMING",
      "transformNameMapping": {
        string: string,
        ...
      },
    }
    

    更改下列內容:

    • JOB_ID:與要更新的工作 ID 相同。
    • JOB_NAME:與要更新的工作名稱相同。

    如果管道中有任何轉換名稱有所變更,您必須提供轉換對應,並使用 transformNameMapping 欄位加以傳送。

  3. 選用:如要使用 curl (Linux、macOS 或 Cloud Shell) 傳送要求,請將要求儲存至 JSON 檔案,然後執行下列指令:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    

    FILE_PATH 改成包含要求內文的 JSON 檔案路徑。

指定替換工作名稱

Java

啟動替換工作時,針對 --jobName 選項所傳送的值必須與待替換工作的名稱完全相符。

Python

啟動替換工作時,針對 --job_name 選項所傳送的值必須與待替換工作的名稱完全相符。

Go

啟動替換工作時,針對 --job_name 選項所傳送的值必須與待替換工作的名稱完全相符。

gcloud

啟動替換工作時,JOB_NAME 必須與待替換工作的名稱完全相符。

REST

replaceJobId 欄位的值設為與要更新的工作相同的工作 ID。如要找到正確的工作名稱值,請在 Dataflow 監控介面中選取先前的工作。然後在「Job info」側邊面板中,找到「Job ID」欄位。

如要找到正確的工作名稱值,請在 Dataflow 監控介面中選取先前的工作。接著在「工作資訊」側邊面板中,找到「工作名稱」欄位:

執行中 Dataflow 工作的工作資訊側邊面板。
圖 1:執行中 Dataflow 工作的「Job info」(工作資訊) 側邊面板,其中有「Job name」(工作名稱) 欄位。

或者,您也可以使用 Dataflow 指令列介面來查詢現有工作的清單。方法是在殼層或終端機視窗中輸入 gcloud dataflow jobs list 指令,取得 Google Cloud專案中的 Dataflow 工作清單,接著找出要替換之工作的 NAME 欄位:

JOB_ID                                    NAME                        TYPE       CREATION_TIME        STATE    REGION
2020-12-28_12_01_09-yourdataflowjobid     ps-topic                    Streaming  2020-12-28 20:01:10  Running  us-central1

建立轉換對應

如果替代管道變更了先前管道中的任何轉換名稱,Dataflow 服務會需要轉換對應。轉換對應會將先前管道程式碼中的已命名轉換,對應至替代管道程式碼中的名稱。

Java

透過下列一般格式使用 --transformNameMapping 指令列選項來傳送對應:

--transformNameMapping= . 
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

您只需要針對先前管道與替換管道之間有變更的轉換名稱,在 --transformNameMapping 中提供對應項目即可。

使用 --transformNameMapping 執行作業時,您可能需要視殼層情況將引號逸出。例如,在 Bash 中:

--transformNameMapping='{"oldTransform1":"newTransform1",...}'

Python

透過下列一般格式使用 --transform_name_mapping 指令列選項來傳送對應:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

您只需要針對先前管道與替換管道之間有變更的轉換名稱,在 --transform_name_mapping 中提供對應項目即可。

使用 --transform_name_mapping 執行作業時,您可能需要視殼層情況將引號逸出。例如,在 Bash 中:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

Go

透過下列一般格式使用 --transform_name_mapping 指令列選項來傳送對應:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

您只需要針對先前管道與替換管道之間有變更的轉換名稱,在 --transform_name_mapping 中提供對應項目即可。

使用 --transform_name_mapping 執行作業時,您可能需要視殼層情況將引號逸出。例如,在 Bash 中:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

gcloud

透過下列一般格式使用 --transform-name-mappings 選項來傳送對應:

--transform-name-mappings= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

您只需要針對先前管道與替換管道之間有變更的轉換名稱,在 --transform-name-mappings 中提供對應項目即可。

使用 --transform-name-mappings 執行作業時,您可能需要視殼層情況將引號逸出。舉例來說,在 Bash 中:

--transform-name-mappings='{"oldTransform1":"newTransform1",...}'

REST

使用下列一般格式,透過 transformNameMapping 欄位傳送對應:

"transformNameMapping": {
  oldTransform1: newTransform1,
  oldTransform2: newTransform2,
  ...
}

您只需要針對先前管道與替換管道之間有變更的轉換名稱,在 transformNameMapping 中提供對應項目即可。

判斷轉換名稱

對應中每個執行個體的轉換名稱,就是您在管道中套用轉換時所提供的名稱。例如:

Java

  .apply("FormatResults", ParDo
    .of(new DoFn<KV<String, Long>>, String>() {
      ...
     }
  }))

Python

  | 'FormatResults' >> beam.ParDo(MyDoFn())

Go

  // In Go, this is always the package-qualified name of the DoFn itself.
  // For example, if the FormatResults DoFn is in the main package, its name
  // is "main.FormatResults".
  beam.ParDo(s, FormatResults, results)

您也可以在 Dataflow 監控介面中查看工作執行圖,藉此取得先前工作的轉換名稱:

WordCount 管道的執行圖。
圖 2:Dataflow 監控介面中顯示的 WordCount 管道執行圖。

複合轉換命名

轉換名稱為階層式結構,以管道中的轉換階層結構為依據。如果管道有複合式轉換,則巢狀轉換的名稱取決於其包含的轉換。舉例來說,假設管道包含名為 CountWidgets 的複合式轉換,且該轉換包含名為 Parse 的內部轉換。轉換的全名為 CountWidgets/Parse,而且您必須在轉換對應中指定該全名。

如果新管道會將複合式轉換對應至不同的名稱,所有的巢狀轉換也會自動重新命名,您必須在轉換對應中為內部轉換指定變更後的名稱。

重構轉換階層

如果替換管道使用與先前管道不同的轉換階層,您就必須明確宣告這項對應。您可能重構了複合轉換,或管道依賴已變更的程式庫中的複合轉換,因此轉換階層結構可能不同。

舉例來說,您先前的管道套用了 CountWidgets 複合式轉換,其中包含名為 Parse 的內部轉換。替代管道會重構 CountWidgets,並在另一個名為 Scan 的轉換中將 Parse 轉成巢狀結構。您必須明確將先前管道的完整轉換名稱 (CountWidgets/Parse) 對應至新管道的轉換名稱 (CountWidgets/Scan/Parse),才能成功進行更新:

Java

--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

如果您在替換管道中完全刪除轉換,則必須提供 null 對應。假設替代管道徹底移除了 CountWidgets/Parse 轉換:

--transformNameMapping={"CountWidgets/Parse":""}

Python

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

如果您在替換管道中完全刪除轉換,則必須提供 null 對應。假設替代管道徹底移除了 CountWidgets/Parse 轉換:

--transform_name_mapping={"CountWidgets/Parse":""}

Go

--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}

如果您在替換管道中完全刪除轉換,則必須提供 null 對應。假設替代管道徹底移除了 CountWidgets/Parse 轉換:

--transform_name_mapping={"CountWidgets/main.Parse":""}

gcloud

--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

如果您在替換管道中完全刪除轉換,則必須提供 null 對應。假設替代管道徹底移除了 CountWidgets/Parse 轉換:

--transform-name-mappings={"CountWidgets/main.Parse":""}

REST

"transformNameMapping": {
  CountWidgets/Parse: CountWidgets/Scan/Parse
}

如果您在替換管道中完全刪除轉換,則必須提供 null 對應。假設替代管道徹底移除了 CountWidgets/Parse 轉換:

"transformNameMapping": {
  CountWidgets/main.Parse: null
}

更換工作造成的影響

替換現有工作時,新工作會執行更新後的管道程式碼。Dataflow 服務會保留工作名稱,但會使用更新後的工作 ID 執行替換工作。這個程序可能會導致停機,因為現有工作會停止、執行相容性檢查,然後啟動新工作。

更換作業會保留下列項目:

中繼狀態資料

保留先前工作的中繼狀態資料。狀態資料不包含記憶體內快取。如要在更新管道時保留記憶體內快取資料,請重構管道,將快取轉換為狀態資料輔助輸入。如要進一步瞭解如何使用側邊輸入,請參閱 Apache Beam 說明文件中的「側邊輸入模式」。

串流管道的 ValueState 和側邊輸入都有大小限制。 因此,如要保留大型快取,可能需要使用外部儲存空間,例如 Memorystore 或 Bigtable。

傳輸中的資料

「傳輸中」的資料仍將交由新管道中的轉換處理,但您在替代管道程式碼中新增的其他轉換不一定會生效,具體結果取決於記錄進行緩衝的位置。在本例中,現有管道具有下列轉換:

Java

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Format' >> FormatStrings()

Go

   beam.ParDo(s, ReadStrings)
   beam.ParDo(s, FormatStrings)

您可以將工作替換為新的管道程式碼,如下所示:

Java

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Remove' >> RemoveStringsStartingWithA()
    | 'Format' >> FormatStrings()

Go

  beam.ParDo(s, ReadStrings)
  beam.ParDo(s, RemoveStringsStartingWithA)
  beam.ParDo(s, FormatStrings)

即使您新增轉換來篩選出開頭為字母「A」的字串,但在進行下一個轉換 (FormatStrings) 後,您可能依然會看到從先前工作轉移而來、以字母「A」為開頭的已緩衝或傳輸中字串。

變更時間區間設定

您可以為替代管道中的 PCollection 元素變更時間區間設定觸發條件策略,但請務必小心。不論是時間區間設定或觸發條件策略,都不會影響已緩衝或傳輸中的資料。

建議您只對管道的時間區間設定進行較細微的變更,例如變更固定時間或滑動時間區間的持續時間。如果對時間區間設定或觸發條件進行重大變更 (例如變更時間區間設定演算法),可能會對管道輸出造成無法預測的結果。

工作相容性檢查

啟動替代工作時,Dataflow 服務會在替代工作與先前工作之間執行相容性檢查。如果通過相容性檢查,先前的工作就會停止,而替換工作隨即會在 Dataflow 服務中啟動,同時保有相同的工作名稱。如果未通過相容性檢查,先前的工作會繼續在 Dataflow 服務中執行,且替換工作會傳回錯誤。

Java

由於限制,您必須使用封鎖執行作業,才能在主控台或終端機中查看更新失敗錯誤。目前的解決方法包括下列步驟:

  1. 在管道程式碼中使用 pipeline.run().waitUntilFinish()
  2. 使用 --update 選項執行替換管道程式。
  3. 等候替換工作成功通過相容性檢查。
  4. 輸入 Ctrl+C 以結束封鎖執行器程序。

或者,您可以在 Dataflow 監控介面中監控替換工作的狀態。如果工作順利啟動,就表示工作也通過了相容性檢查。

Python

由於限制,您必須使用封鎖執行作業,才能在主控台或終端機中查看更新失敗錯誤。目前的解決方法包括下列步驟:

  1. 在管道程式碼中使用 pipeline.run().wait_until_finish()
  2. 使用 --update 選項執行替換管道程式。
  3. 等候替換工作成功通過相容性檢查。
  4. 輸入 Ctrl+C 以結束封鎖執行器程序。

或者,您可以在 Dataflow 監控介面中監控替換工作的狀態。如果工作順利啟動,就表示工作也通過了相容性檢查。

Go

由於限制,您必須使用封鎖執行作業,才能在主控台或終端機中查看更新失敗錯誤。具體來說,您必須使用 --execute_async--async 標記指定非封鎖執行作業。目前的解決方法包括下列步驟:

  1. 使用 --update 選項執行替換管道程式,且不使用 --execute_async--async 標記。
  2. 等候替換工作成功通過相容性檢查。
  3. 輸入 Ctrl+C 以結束封鎖執行器程序。

gcloud

由於限制,您必須使用封鎖執行作業,才能在主控台或終端機中查看更新失敗錯誤。目前的解決方法包括下列步驟:

  1. 如果是 Java 管道,請在管道程式碼中使用 pipeline.run().waitUntilFinish()。如果是 Python 管道,請在管道程式碼中使用 pipeline.run().wait_until_finish()。 如果是 Go 管道,請按照「Go」分頁中的步驟操作。
  2. 使用 --update 選項執行替換管道程式。
  3. 等候替換工作成功通過相容性檢查。
  4. 輸入 Ctrl+C 以結束封鎖執行器程序。

REST

由於限制,您必須使用封鎖執行作業,才能在主控台或終端機中查看更新失敗錯誤。目前的解決方法包括下列步驟:

  • 如果是 Java 管道,請在管道程式碼中使用 pipeline.run().waitUntilFinish()。如果是 Python 管道,請在管道程式碼中使用 pipeline.run().wait_until_finish()。 如果是 Go 管道,請按照「Go」分頁中的步驟操作。
  • 使用 replaceJobId 欄位執行替代管道程式。
  • 等候替換工作成功通過相容性檢查。
  • 輸入 Ctrl+C 以結束封鎖執行器程序。

相容性檢查會使用您提供的轉換對應,確保 Dataflow 能將中繼狀態資料從先前工作中的步驟轉移至替代工作。相容性檢查也可確保管道中的 PCollection 使用相同的 Coder。變更 Coder 可能會導致相容性檢查失敗,因為任何傳輸中資料或已緩衝記錄都可能無法在替代管道中正確序列化。

避免相容性中斷

先前管道與替代管道之間的某些差異,可能會導致相容性檢查失敗。這些差異包括:

  • 變更管道圖,但不提供對應。更新工作時,Dataflow 會嘗試將先前工作中的轉換與替代工作中的轉換進行配對。這個比對程序可協助 Dataflow 轉移每個步驟的中繼狀態資料。如果您重新命名或移除任何步驟,就必須提供轉換對應,這樣 Dataflow 才能據此比對狀態資料。
  • 變更步驟的側邊輸入。如果您為替換管道中的轉換新增或移除側邊輸入,將會導致相容性檢查失敗。
  • 變更步驟的 Coder。更新工作時,Dataflow 會保留目前緩衝的任何資料記錄,並在替代工作中處理這些資料記錄。舉例來說,緩衝資料可能會在時間區間解析時發生。如果替代工作使用不同或不相容的資料編碼,Dataflow 就無法對這些記錄進行序列化或取消序列化作業。
  • 從管道中移除「有狀態」的作業。如果您從管道中移除有狀態的作業,替代工作可能無法通過相容性檢查。Dataflow 可以整併多個步驟以提高效率。如果您從整併後的步驟中移除了需要狀態的作業,就無法通過檢查。有狀態的作業包括:

    • 會產生或使用側邊輸入的轉換。
    • I/O 讀取。
    • 使用帶鍵狀態的轉換。
    • 具有時間區間合併的轉換。
  • 變更有狀態的 DoFn 變數。 如果是持續執行的串流工作,如果管道包含有狀態的 DoFn,變更有狀態的 DoFn 變數可能會導致管道失敗。

  • 嘗試在不同的地理區域執行替換工作。 在執行先前工作的同一個區域中執行替代工作。

更新結構定義

Apache Beam 允許 PCollection 具有具名欄位的結構定義,在這種情況下,不需要明確的 Coder。如果特定結構定義的欄位名稱和類型 (包括巢狀欄位) 維持不變,該結構定義就不會導致更新檢查失敗。不過,如果新管道的其他區段不相容,更新作業可能仍會遭到封鎖。

演進結構定義

由於業務需求不斷演進,因此通常需要演進 PCollection 的結構定義。更新管道時,Dataflow 服務允許對結構定義進行下列變更:

  • 在結構定義中新增一或多個欄位,包括巢狀欄位。
  • 將必要 (不可為空值) 欄位類型設為選用 (可為空值)。

更新期間不得移除欄位、變更欄位名稱或變更欄位類型。

將其他資料傳送到現有的 ParDo 作業

您可以視用途採用下列其中一種方法,將其他 (頻外) 資料傳送到現有的 ParDo 作業中:

  • 將資訊序列化為 DoFn 子類別中的欄位。
  • 匿名 DoFn 中的方法所參照的任何變數都將自動序列化。
  • DoFn.startBundle() 中計算資料。
  • 使用 ParDo.withSideInputs 傳入資料。

如需詳細資訊,請參閱以下頁面: