如要停止 Dataflow 工作,請使用Google Cloud 主控台、Cloud Shell、安裝 Google Cloud CLI 的本機終端機,或 Dataflow REST API。
您可以透過下列三種方式之一停止 Dataflow 工作:
取消工作。這個方法適用於串流管道和批次管道。取消工作後,Dataflow 服務會停止處理所有資料,包括緩衝資料。詳情請參閱「取消工作」。
排除工作。這個方法僅適用於串流管道。 排空工作可讓 Dataflow 服務完成緩衝資料的處理作業,同時停止擷取新資料。詳情請參閱「排空工作」。
強制取消工作。這個方法適用於串流管道和批次管道。強制取消工作後,Dataflow 服務會立即停止處理任何資料,包括緩衝資料。強制取消前,請先嘗試正常取消。 強制取消僅適用於卡在一般取消程序中的工作。詳情請參閱「強制取消工作」。
取消工作後,就無法重新啟動。如果未使用彈性範本,可以複製已取消的管道,然後從複製的管道啟動新作業。
停止串流管道前,請考慮建立作業的快照。Dataflow 快照會儲存串流管道的狀態,方便您啟動新版 Dataflow 工作,而不會失去狀態。詳情請參閱使用 Dataflow 快照。
如果管道較為複雜,建議建立範本,並從範本執行工作。
您無法刪除 Dataflow 工作,但可以封存已完成的工作。所有已完成的工作 (包括封存工作清單中的工作),都會在 30 天保留期限過後刪除。
取消 Dataflow 工作
取消工作後,Dataflow 服務會立即停止工作。
取消工作後,會發生下列情況:
Dataflow 服務會停止所有資料擷取和處理作業。
Dataflow 服務會開始清除與工作相關聯的 Google Cloud資源。
這類資源可能包含關閉 Compute Engine 工作站執行個體,以及關閉連到 I/O 來源或接收器的有效連線。
取消工作的重要資訊
取消工作會立即停止管道的處理作業。
取消工作時,您可能會遺失傳輸中的資料。「傳輸中」的資料是指已讀取,但管道仍在處理的資料。
在您取消工作前,從管道寫入輸出接收器的資料,在輸出接收器上也許仍可存取。
如果不擔心資料遺失,取消工作可確保與工作相關聯的Google Cloud 資源盡快關閉。
排空 Dataflow 工作
排除工作時,Dataflow 服務會在工作的目前狀態下完成工作。如要在停止串流管道時避免資料遺失,最佳做法是排除工作。
排除工作時會發生下列情況:
收到排除要求後,您的工作很快就會停止從輸入來源擷取新資料 (通常在幾分鐘內)。
Dataflow 服務會保留工作站執行個體等任何現有資源,以完成管道內任何已緩衝資料的處理與寫入作業。
所有待處理的處理和寫入作業完成時,Dataflow 服務會關閉與工作相關聯的 Google Cloud 資源。
如要排空工作,Dataflow 會停止讀取新輸入內容,並以無限大的事件時間戳記標記來源,然後透過管道傳播無限大的時間戳記。因此,正在排空的管道可能會出現無限的水印。
排空作業的重要資訊
批次管道不支援排除作業。
所有處理和寫入作業完成前,管道會持續產生相關聯資源的維護費用。Google Cloud
您可以更新正在排除的管道。如果管道停滯,請使用修正問題的程式碼更新管道,即可順利排空管道,且不會遺失資料。
你可以取消目前正在耗電的工作。
排除作業可能需要大量時間才能完成,例如管道中有大量緩衝資料時。
如果串流管道包含 Splittable DoFn,則必須先截斷結果,才能執行排除選項。如要進一步瞭解如何截斷 Splittable DoFn,請參閱 Apache Beam 說明文件。
在某些情況下,Dataflow 工作可能無法完成排空作業。您可以查看工作記錄,找出根本原因並採取適當行動。
資料保留
Dataflow 串流可容許工作站重新啟動,發生錯誤時也不會導致串流工作失敗。而是會持續重試,直到您採取行動 (例如取消或重新啟動工作) 為止。排空工作時,Dataflow 會持續重試,這可能會導致管道停滯。在這種情況下,如要順利排空管道,且不會遺失資料,請使用可修正問題的程式碼更新管道。
Dataflow 服務會先永久提交訊息,才會確認訊息。舉例來說,使用 Kafka 時,您可以將這個程序視為安全地將訊息擁有權從 Kafka 移交給 Dataflow,消除資料遺失的風險。
停滯的工作
- 排除作業無法修正管道停滯問題。如果資料移動遭到封鎖,管道會在排空指令後停滯。如要解決管道停滯的問題,請使用 update 指令更新管道,並加入可解決問題的程式碼。您也可以取消停滯的工作,但取消工作可能會導致資料遺失。
計時器
如果串流管道程式碼包含迴圈計時器,工作可能會變慢或無法排空。由於所有計時器完成後,排空作業才會結束,因此具有無限迴圈計時器的管道永遠不會完成排空作業。
Dataflow 會等到所有處理時間計時器完成,而不是立即觸發,這可能會導致排空作業緩慢。
排除工作造成的影響
排除串流管道時,Dataflow 會立即關閉任何處理中的時間區間,並啟動所有觸發條件。
在排空作業中,系統不會等待任何未完成的時間型時間區間完成。
舉例來說,當您排除工作時,管道已在兩小時的時間區間執行了十分鐘,則 Dataflow 不會等待該時間區間的剩餘時間結束,它會立即結束該時段,而且只會擁有部分結果。Dataflow 會將資料浮水印提高至無限大,藉此關閉尚未完成的時間區間。此功能也適用於自訂資料來源。
排除使用自訂資料來源類別的管道時,Dataflow 會停止發出新資料要求、將資料浮水印提高至無限大,並於最後一個檢查點呼叫來源的 finalize()
方法。
如果排空,視窗可能只會部分填滿。在這種情況下,如果重新啟動已耗盡的管道,同一個視窗可能會再次觸發,進而導致資料發生問題。舉例來說,在下列情境中,檔案可能會有名稱衝突,且資料可能會遭到覆寫:
如果您在下午 12:34 排空管道,則下午 12:00 到下午 1:00 的時間範圍只會關閉,且只會包含時間範圍前 34 分鐘內觸發的資料。管道在下午 12:34 後不會讀取新資料。
如果您隨即重新啟動管道,系統會再次觸發下午 12:00 至下午 1:00 的時間範圍,但只會讀取下午 12:35 至下午 1:00 的資料。系統不會傳送重複項目,但如果檔案名稱重複,資料就會遭到覆寫。
在 Google Cloud 控制台中,您可以查看管道轉換的詳細資料。下圖顯示進行中排空作業的效果。請注意,浮水印已提高至最大值。
圖 1. 排空作業的步驟檢視畫面。
強制取消 Dataflow 工作
只有在無法使用其他方法取消工作時,才使用強制取消。 強制取消會終止工作,但不會清除所有資源。 如果重複使用強制取消功能,可能會累積洩漏的資源,而這些資源會占用配額。
強制取消工作時,Dataflow 服務會立即停止工作,並洩漏 Dataflow 工作建立的任何 VM。強制取消前,請先嘗試正常取消,且必須在至少 30 分鐘前進行。
強制取消工作時,會發生下列情況:
- Dataflow 服務會停止所有資料擷取和處理作業。
強制取消工作的重要資訊
強制取消工作會立即停止管道的處理作業。
只有在工作卡在一般取消程序時,才應強制取消工作。
Dataflow 工作建立的任何工作站執行個體不一定會釋出,這可能會導致工作站執行個體洩漏。洩漏的工作站執行個體不會計入工作費用,但可能會使用配額。工作取消後,您就可以刪除這些資源。
如果是 Dataflow Prime 工作,您無法查看或刪除洩漏的 VM。 在大多數情況下,這些 VM 不會造成問題。不過,如果洩漏的 VM 造成問題 (例如耗用 VM 配額),請與支援團隊聯絡。
停止 Dataflow 工作
停止工作前,請務必瞭解取消、排空或強制取消工作會造成哪些影響。
控制台
前往 Dataflow 的「Jobs」(工作) 頁面:
按一下要停止的工作。
如要停止工作,工作狀態必須為「執行中」。
在工作詳細資料頁面中,按一下「停止」。
執行下列其中一個步驟:
如果是批次管道,請按一下「取消」或「強制取消」。
如果是串流管道,請按一下「取消」、「排除」或「強制取消」。
如要確認所選動作,請按一下「停止工作」。
gcloud
如要排空或取消 Dataflow 工作,請在 Cloud Shell 或已安裝 gcloud CLI 的本機終端機中,使用 gcloud dataflow jobs
指令。
登入 Shell。
列出目前執行的 Dataflow 工作 ID,然後記下要停止的工作 ID:
gcloud dataflow jobs list
如果未設定
--region
旗標,系統會顯示所有可用區域的 Dataflow 工作。執行下列其中一個步驟:
如要排空串流工作:
gcloud dataflow jobs drain JOB_ID
將
JOB_ID
換成您先前複製的工作 ID。如要取消批次或串流工作:
gcloud dataflow jobs cancel JOB_ID
將
JOB_ID
換成您先前複製的工作 ID。如要強制取消批次或串流工作:
gcloud dataflow jobs cancel JOB_ID --force
將
JOB_ID
換成您先前複製的工作 ID。
API
如要使用 Dataflow REST API 取消或排空工作,可以選擇 projects.locations.jobs.update
或 projects.jobs.update
。在要求主體中,於所選 API 的工作執行個體 requestedState
欄位中,傳遞必要工作狀態。
重要事項:建議使用 projects.locations.jobs.update
,因為 projects.jobs.update
只能更新在 us-central1
中執行的工作狀態。
如要取消工作,請將工作狀態設為
JOB_STATE_CANCELLED
。如要排空工作,請將工作狀態設為
JOB_STATE_DRAINED
。如要強制取消工作,請使用
"force_cancel_job": "true"
標籤將工作狀態設為JOB_STATE_CANCELLED
。 要求主體為:{ "requestedState": "JOB_STATE_CANCELLED", "labels": { "force_cancel_job": "true" } }
偵測 Dataflow 工作完成情況
如要偵測工作取消或排空作業何時完成,請使用下列其中一種方法:
- 使用 Cloud Composer 等工作流程自動化調度管理服務,監控 Dataflow 工作。
- 以同步方式執行管道,讓工作遭到封鎖,直到管道完成為止。 詳情請參閱「設定管道選項」中的「控管執行模式」。
使用 Google Cloud CLI 中的指令列工具輪詢工作狀態。 如要取得專案中所有 Dataflow 工作的清單,請在殼層或終端機中執行下列指令:
gcloud dataflow jobs list
輸出內容會顯示每個工作的 ID、名稱、狀態 (
STATE
) 和其他資訊。詳情請參閱使用 Google Cloud CLI 列出作業。
封存已完成的 Dataflow 工作
封存 Dataflow 工作後,該工作就會從主控台 Dataflow「Jobs」(工作) 頁面的工作清單中移除。工作會移至封存的工作清單。您只能封存已完成的工作,包括處於下列狀態的工作:
JOB_STATE_CANCELLED
JOB_STATE_DRAINED
JOB_STATE_DONE
JOB_STATE_FAILED
JOB_STATE_UPDATED
如要進一步瞭解如何驗證這些狀態,請參閱「偵測 Dataflow 工作完成情況」。
如要瞭解如何排解封存工作時發生的問題,請參閱「排解 Dataflow 錯誤」一文中的「封存工作錯誤」。
所有封存的工作會在 30 天保留期限過後刪除。
封存工作
如要從 Dataflow「工作」頁面的主要工作清單中移除已完成的工作,請按照下列步驟操作。
主控台
前往 Google Cloud 控制台的 Dataflow「Jobs」(工作) 頁面。
畫面上會顯示 Dataflow 工作清單,清單中也會顯示工作的狀態。
選取職務。
在「Job Details」(工作詳細資料) 頁面中,按一下「Archive」(封存)。如果工作尚未完成,就無法使用「封存」選項。
REST
如要使用 API 封存工作,請使用 projects.locations.jobs.update
方法。
在這項要求中,您必須指定更新後的 JobMetadata
物件。在 JobMetadata.userDisplayProperties
物件中,使用鍵/值組合 "archived":"true"
。
除了更新 JobMetadata
物件,API 要求也必須在要求網址中加入 updateMask 查詢參數:
https://dataflow.googleapis.com/v1b3/[...]/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived
使用任何要求資料之前,請先替換以下項目:
- PROJECT_ID:您的專案 ID
- REGION:Dataflow 區域
- JOB_ID:Dataflow 工作的 ID
HTTP 方法和網址:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived
JSON 要求主體:
{ "job_metadata": { "userDisplayProperties": { "archived": "true" } } }
如要傳送要求,請選擇以下其中一個選項:
curl
將要求主體儲存在名為 request.json
的檔案中,然後執行下列指令:
curl -X PUT \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived"
PowerShell
將要求主體儲存在名為 request.json
的檔案中,然後執行下列指令:
$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }
Invoke-WebRequest `
-Method PUT `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" | Select-Object -Expand Content
您應該會收到如下的 JSON 回應:
{ "id": "JOB_ID", "projectId": "PROJECT_ID", "currentState": "JOB_STATE_DONE", "currentStateTime": "2025-05-20T20:54:41.651442Z", "createTime": "2025-05-20T20:51:06.031248Z", "jobMetadata": { "userDisplayProperties": { "archived": "true" } }, "startTime": "2025-05-20T20:51:06.031248Z" }
查看及還原封存的工作
請按照下列步驟查看封存的工作,或將封存的工作還原至 Dataflow「Jobs」(工作) 頁面的主要工作清單。
主控台
前往 Google Cloud 控制台的 Dataflow「Jobs」(工作) 頁面。
按一下「已封存」切換按鈕。系統會顯示已封存的 Dataflow 工作清單。
選取職務。
如要將工作還原至 Dataflow「Jobs」(工作) 頁面的主要工作清單,請在「Job Details」(工作詳細資料) 頁面中按一下「Restore」(還原)。
REST
如要使用 API 還原已封存的工作,請使用 projects.locations.jobs.update
方法。
在這項要求中,您必須指定更新後的 JobMetadata
物件。在 JobMetadata.userDisplayProperties
物件中,使用鍵/值組合 "archived":"false"
。
除了更新 JobMetadata
物件,API 要求也必須在要求網址中加入 updateMask 查詢參數:
https://dataflow.googleapis.com/v1b3/[...]/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived
使用任何要求資料之前,請先替換以下項目:
- PROJECT_ID:您的專案 ID
- REGION:Dataflow 區域
- JOB_ID:Dataflow 工作的 ID
HTTP 方法和網址:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived
JSON 要求主體:
{ "job_metadata": { "userDisplayProperties": { "archived": "false" } } }
如要傳送要求,請選擇以下其中一個選項:
curl
將要求主體儲存在名為 request.json
的檔案中,然後執行下列指令:
curl -X PUT \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived"
PowerShell
將要求主體儲存在名為 request.json
的檔案中,然後執行下列指令:
$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }
Invoke-WebRequest `
-Method PUT `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID/?updateMask=job_metadata.user_display_properties.archived" | Select-Object -Expand Content
您應該會收到如下的 JSON 回應:
{ "id": "JOB_ID", "projectId": "PROJECT_ID", "currentState": "JOB_STATE_DONE", "currentStateTime": "2025-05-20T20:54:41.651442Z", "createTime": "2025-05-20T20:51:06.031248Z", "jobMetadata": { "userDisplayProperties": { "archived": "false" } }, "startTime": "2025-05-20T20:51:06.031248Z" }
後續步驟
- 探索 Dataflow REST API。
- 在 Google Cloud 控制台中探索 Dataflow 監控介面。
- 進一步瞭解如何更新管道。