本頁提供升級串流管道的指引和建議。舉例來說,您可能需要升級至新版 Apache Beam SDK,或是更新管道程式碼。我們提供不同選項,以因應各種情況。
批次管道會在工作完成時停止,但串流管道通常會持續執行,以提供不間斷的處理作業。因此,升級串流管道時,請考量下列事項:
- 您可能需要盡量減少或避免管道中斷。在某些情況下,部署新版管道時,您或許可以容忍暫時中斷處理作業。在其他情況下,應用程式可能無法容忍任何中斷。
- 管道更新程序必須處理結構定義變更,盡量減少對訊息處理和其他附加系統的干擾。舉例來說,如果事件處理管道中的訊息結構定義有所變更,下游資料接收器可能也需要變更結構定義。
視管道和更新需求而定,您可以使用下列其中一種方法更新串流管道:
如要進一步瞭解更新期間可能發生的問題,以及如何避免這些問題,請參閱「驗證替代工作」和「工作相容性檢查」。
最佳做法
- 升級 Apache Beam SDK 版本時,請勿一併變更任何管道程式碼。
- 每次變更後,請先測試 pipeline,再進行其他更新。
- 定期升級管道使用的 Apache Beam SDK 版本。
- 盡可能使用自動化方法,例如更新期間更新或自動平行管道更新。
執行飛行中更新
您可以更新部分進行中的串流管道,而不需停止工作。這種情況稱為「執行中作業更新」。僅在特定情況下,才能在工作執行期間更新工作:
- 工作必須使用 Streaming Engine。
- 工作必須處於執行中狀態。
- 您只會變更工作使用的工作站數量。
詳情請參閱「自動水平調度」頁面中的「設定自動調度範圍」。
如需瞭解如何更新執行中的作業,請參閱「更新現有管道」一文。
啟動替換工作
如果更新後的工作與現有工作相容,您可以使用 update
選項更新管道。替換現有工作時,新工作會執行更新後的管道程式碼。Dataflow 服務會保留工作名稱,但會使用更新後的工作 ID 執行替換工作。這個程序可能會導致停機,因為現有工作會停止、執行相容性檢查,然後啟動新工作。詳情請參閱「取代工作會造成的影響」。
Dataflow 會執行相容性檢查,確保更新後的管道程式碼可以安全地部署至執行中的管道。某些程式碼變更會導致相容性檢查失敗,例如在現有步驟中新增或移除側邊輸入。如果相容性檢查失敗,您就無法執行就地作業更新。
如需啟動替代工作的操作說明,請參閱「啟動替代工作」。
如果管道更新與目前的工作不相容,您需要停止並替換管道。如果管道無法容忍停機時間,請執行平行管道。
停止並更換管道
如果可以暫時停止處理作業,您可以取消或排除管道,然後換成更新後的管道。取消管道後,Dataflow 會立即停止處理作業,並盡快關閉資源,這可能會導致部分正在處理的資料 (又稱傳輸中資料) 遺失。為避免資料遺失,在大多數情況下,建議採取排空動作。 您也可以使用 Dataflow 快照儲存串流管道的狀態,方便您啟動新版 Dataflow 工作,而不會失去狀態。詳情請參閱「使用 Dataflow 快照」一文。
排除管道時,系統會立即關閉所有處理中的時間區間,並啟動所有觸發條件。雖然不會遺失傳輸中的資料,但排空作業可能會導致視窗資料不完整。如果發生這種情況,處理中的視窗會發出部分或不完整的結果。詳情請參閱排空工作對作業的影響。現有工作完成後,請啟動包含更新管道程式碼的新串流工作,以便繼續處理資料。
使用這種方法時,現有串流工作停止到替代管道準備好繼續處理資料之間,會有一段停機時間。不過,取消或排空現有管道,然後使用更新後的管道啟動新工作,比執行平行管道簡單。
如需詳細操作說明,請參閱「排空 Dataflow 工作」。排空目前的工作後,請啟動名稱相同的新工作。
使用 Pub/Sub 快照和 Seek 重新處理訊息
在某些情況下,更換或取消耗盡的管道後,您可能需要重新處理先前傳送的 Pub/Sub 訊息。舉例來說,您可能需要使用更新後的商業邏輯重新處理資料。 Pub/Sub 搜尋功能可讓您重播 Pub/Sub 快照中的訊息。您可以搭配使用 Pub/Sub Seek 與 Dataflow,從建立訂閱項目快照的時間開始重新處理訊息。
在開發和測試期間,您也可以使用 Pub/Sub Seek 重複播放已知訊息,驗證管道的輸出內容。使用 Pub/Sub Seek 時,請勿在管道耗用訂閱項目時,搜尋訂閱快照。如果這麼做,搜尋可能會導致 Dataflow 的浮水印邏輯失效,並影響 Pub/Sub 訊息的僅處理一次作業。
建議您在終端機視窗中,按照下列 gcloud CLI 工作流程,搭配使用 Pub/Sub Seek 與 Dataflow 管道:
如要建立訂閱項目的快照,請使用
gcloud pubsub snapshots create
指令:gcloud pubsub snapshots create SNAPSHOT_NAME --subscription=PIPELINE_SUBSCRIPTION_NAME
如要排空或取消管道,請使用
gcloud dataflow jobs drain
指令或gcloud dataflow jobs cancel
指令:gcloud dataflow jobs drain JOB_ID
或
gcloud dataflow jobs cancel JOB_ID
如要搜尋快照,請使用
gcloud pubsub subscriptions seek
指令:gcloud pubsub subscriptions seek SNAPSHOT_NAME
部署會取用訂閱項目的新管道。
同時執行多個管道
如要避免更新期間串流管道中斷,可以執行平行管道。這種做法可讓您使用更新後的管道程式碼啟動新的串流工作,並與現有工作並行執行。您可以使用 Dataflow 的自動平行管道更新部署工作流程,也可以手動執行這些步驟。
平行管道總覽
建立新管道時,請使用與現有管道相同的時間區間設定策略。如果是手動工作流程,請讓現有管道繼續執行,直到其浮水印超過更新後管道處理的最早完整視窗時間戳記為止。然後排除或取消現有管道。 如果使用自動化工作流程,系統會為您完成這項工作。更新後的管道會繼續在原處執行,並有效接管處理作業。
下圖說明了這個程序。
在圖中,管道 B 是接手管道 A 的更新工作。值 t 是 Pipeline B 處理的最早完整時間範圍的時間戳記。值 w 是 Pipeline A 的浮水印。 為求簡單,我們假設浮水印完美無缺,沒有任何延遲資料。水平軸代表處理時間和實際時間。這兩個管道都使用五分鐘的固定 (翻滾) 時間區間。浮水印超過每個時間區間的結尾時,系統就會觸發結果。
由於並行輸出作業發生在兩個管道重疊的時間範圍內,請將兩個管道設定為將結果寫入不同目的地。下游系統隨後可使用兩個目的地接收器的抽象化 (例如資料庫檢視畫面),查詢合併結果。這些系統也可以使用抽象化功能,刪除重疊期間的重複結果。詳情請參閱「處理重複輸出內容」。
限制
使用自動或手動並行管道更新時,會受到下列限制:
- 僅限自動更新:新的平行工作必須是 Streaming Engine 工作。
- 新舊工作名稱不得相同,因為系統不允許並行執行同名工作。
- 在相同輸入內容上平行執行兩個管道,可能會導致資料重複、部分匯總,以及將資料插入接收器時可能發生排序問題。下游系統的設計必須能預測及管理這些結果。
- 從 Pub/Sub 來源讀取資料時,不建議將同一個訂閱項目用於多個管道,否則可能會導致正確性問題。不過,在某些用途 (例如擷取、轉換、載入 (ETL) 管道) 中,跨兩個管道使用相同訂閱項目可能會減少重複作業。只要為重疊時間長度提供非零值,就可能發生自動調度資源問題。使用執行中作業更新功能即可減輕影響。詳情請參閱「微調 Pub/Sub 串流管道的自動調度功能」。
- 如果是 Apache Kafka,您可以啟用 Kafka 中的偏移量提交功能,盡量減少重複資料。如要在 Kafka 中啟用偏移量提交,請參閱「提交回 Kafka」。
自動平行更新管道
Dataflow 提供 API 支援,可啟動平行取代作業。這個宣告式 API 會將執行程序步驟的手動工作抽象化。您要宣告要更新的工作,然後新工作會與舊工作並行執行。新的工作執行指定時間後,舊的工作就會排空。這項功能可避免更新期間處理作業暫停,並減少更新不相容管道所需的操作工作。
如果管道可容許部分重複或部分匯總,且插入資料時不需嚴格排序,則最適合使用這種更新方法。這個執行器非常適合 ETL 管道,以及使用至少一次串流模式和 Redistribute
轉換作業 (允許重複項目設為 true
) 的管道。
傳送自動平行管道更新要求
如要使用自動化工作流程,請啟動新的串流工作,並使用下列服務選項。啟動新工作時,工作名稱必須與舊工作不同。
Java
--dataflowServiceOptions="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
或者,您也可以指定舊工作的工作 ID:
--dataflowServiceOptions="parallel_replace_job_id=OLD_JOB_ID" \
--dataflowServiceOptions="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Python
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
或者,您也可以指定舊工作的工作 ID:
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
Go
--dataflow_service_options="parallel_replace_job_name=OLD_JOB_NAME" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
或者,您也可以指定舊工作的工作 ID:
--dataflow_service_options="parallel_replace_job_id=OLD_JOB_ID" \
--dataflow_service_options="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
gcloud
--additional-experiments="parallel_replace_job_name=OLD_JOB_NAME" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
或者,您也可以指定舊工作的工作 ID:
--additional-experiments="parallel_replace_job_id=OLD_JOB_ID" \
--additional-experiments="parallel_replace_job_min_parallel_pipelines_duration=DURATION"
替換下列變數:
- 您必須提供
parallel_replace_job_name
或parallel_replace_job_id
,才能識別要取代的工作。OLD_JOB_NAME
:如果您使用parallel_replace_job_name
,則為要取代的工作名稱。OLD_JOB_ID
:如果您使用parallel_replace_job_id
,這是要取代的工作 ID。
你必須提供
parallel_replace_job_min_parallel_pipelines_duration
值。DURATION
:以整數或浮點數表示,兩個管道並行執行的最短時間。這段時間過後,系統會向舊工作傳送排空訊號。時間長度必須介於 0 秒 (
0s
) 和 31 天 (744h
) 之間。 請使用s
、m
和h
指定秒數、分鐘數和時數。例如10m
為 10 分鐘。
啟動新工作時,Dataflow 會等待所有工作站完成佈建,再開始處理資料。如要監控部署狀態,請查看 Dataflow 工作記錄。
手動執行平行管道
如要處理更複雜的情況,或進一步控管更新程序,可以手動執行平行管道。讓現有管道繼續執行,直到浮水印超過更新後管道處理的最早完整視窗時間戳記為止。接著,排空或取消現有管道。
處理重複的輸出內容
以下範例說明處理重複輸出內容的方法。這兩個管道會將輸出內容寫入不同目的地,使用下游系統查詢結果,並從重疊期間中移除重複結果。這個範例使用的管道會從 Pub/Sub 讀取輸入資料、執行一些處理作業,然後將結果寫入 BigQuery。
在初始狀態下,現有的串流管道 (管道 A) 正在執行,並使用訂閱項目 (訂閱項目 A) 從 Pub/Sub 主題 (主題) 讀取訊息。結果會寫入 BigQuery 資料表 (資料表 A)。結果會透過 BigQuery 檢視畫面使用,該檢視畫面會做為門面,遮蓋基礎資料表變更。這個程序是外觀模式設計方法的應用。下圖顯示初始狀態。
為更新後的管道建立新訂閱項目 (訂閱項目 B)。部署更新後的管道 (管道 B),透過 訂閱 B 從 Pub/Sub 主題 (主題) 讀取資料,並寫入另一個 BigQuery 資料表 (資料表 B)。下圖說明瞭這個流程。
此時,管道 A 和 管道 B 會平行執行,並將結果寫入不同的資料表。您會將時間 t 記錄為 Pipeline B 處理的最早完整視窗的時間戳記。
當 Pipeline A 的浮水印超過時間 t 時,請排空 Pipeline A。排除管道時,所有開啟的視窗都會關閉,並完成處理中的資料。如果管道包含時間區間,且完整時間區間很重要 (假設沒有延遲資料),請先讓兩個管道都執行,直到有完整重疊的時間區間,再排空 Pipeline A。在所有傳輸中的資料處理完畢並寫入「資料表 A」後,停止「管道 A」的串流工作。下圖顯示這個階段。
此時只有「管道 B」正在執行。您可以從 BigQuery 檢視區塊 (Façade View) 查詢,該檢視區塊會做為資料表 A 和資料表 B 的外觀。如果兩個資料表中的資料列具有相同時間戳記,請將檢視區塊設為傳回資料表 B 中的資料列,或在資料表 B 中沒有資料列時,改為傳回資料表 A 中的資料列。下圖顯示從「資料表 A」和「資料表 B」讀取的檢視區塊 (「外觀檢視區塊」)。
此時,您可以刪除「訂閱方案 A」。
如果系統偵測到新管道部署作業有問題,平行管道可簡化回溯作業。在這個範例中,您可能想讓 Pipeline A 繼續執行,同時監控 Pipeline B 是否正常運作。如果管道 B 發生任何問題,您可以回溯至管道 A。
處理結構定義變異
資料處理系統通常需要配合架構突變,有時是因應業務需求變化,有時是基於技術考量。套用結構定義更新時,通常需要審慎規劃及執行,以免業務資訊系統中斷。
假設某個管道會從 Pub/Sub 主題讀取含有 JSON 酬載的訊息。管道會將每則訊息轉換為 TableRow
執行個體,然後將資料列寫入 BigQuery 資料表。輸出資料表的結構與管道處理的訊息類似。在下圖中,結構定義稱為「結構定義 A」。
隨著時間推移,訊息結構定義可能會發生重大變化。例如,新增、移除或取代欄位。結構定義 A 演變成新結構定義。在接下來的討論中,新結構定義稱為「結構定義 B」。在這種情況下,您需要更新管道 A,且輸出資料表結構定義必須支援結構定義 B。
對於輸出資料表,您可以在不停機的情況下執行部分結構定義突變。舉例來說,您可以新增欄位或放寬資料欄模式,例如將 REQUIRED
變更為 NULLABLE
,而不會發生停機情形。這些變動通常不會影響現有查詢。不過,如果結構定義突變,修改或移除現有的結構定義欄位,就會導致查詢中斷或其他問題。下列方法可因應變更,且不需要停機。
將管道寫入的資料分別存入主資料表和一或多個暫存資料表。主要資料表會儲存管道寫入的歷來資料。暫存資料表會儲存最新的管道輸出內容。您可以針對主資料表和暫存資料表定義 BigQuery 外觀檢視區塊,讓消費者查詢歷史資料和最新資料。
下圖修訂了先前的管道流程,納入暫存資料表 (暫存資料表 A)、主資料表和外觀檢視畫面。
在修訂後的流程中,管道 A 會處理使用結構定義 A 的訊息,並將輸出內容寫入具有相容結構定義的暫存資料表 A。主要資料表包含管道先前版本寫入的歷史資料,以及定期從暫存資料表合併的結果。消費者可以使用外觀檢視畫面查詢最新資料,包括歷來資料和即時資料。
當訊息結構定義從「結構定義 A」變更為「結構定義 B」時,您可能需要更新管道程式碼,才能與使用「結構定義 B」的訊息相容。現有管道必須更新為新的實作方式。執行平行管道可確保串流資料處理作業持續進行,不會中斷。終止並更換管道會導致處理作業中斷,因為有一段時間沒有管道執行作業。
更新後的管道會寫入使用結構定義 B 的額外暫存資料表 (暫存資料表 B)。您可以先使用協調式工作流程建立新的暫存資料表,再更新管道。更新外觀檢視畫面,納入新暫存資料表的結果,可能需要使用相關工作流程步驟。
下圖顯示更新後的流程,其中包含具有「結構定義 B」的「暫存資料表 B」,以及如何更新外觀檢視畫面,以納入主要資料表和兩個暫存資料表的內容。
您可以定期或視需要將暫存資料表併入主要資料表,這與管道更新是不同的程序。下圖顯示如何將「暫存資料表 A」併入主要資料表。
後續步驟
- 如需更新現有管道的詳細步驟,請參閱這篇文章。