使用 Dataflow 串流變更
透過 Bigtable Beam 連接器,您可以使用 Dataflow 讀取 Bigtable 資料變更記錄,不必在程式碼中追蹤或處理分割區變更,因為連接器會為您處理這項邏輯。
本文說明如何設定及使用 Bigtable Beam 連接器,透過 Dataflow 管道讀取變更串流。閱讀本文之前,請先參閱變更串流總覽,並熟悉 Dataflow。
建構自有管道的替代方案
如果您不想自行建構 Dataflow 管道,可以採用下列其中一種做法。
您可以使用 Google 提供的 Dataflow 範本。
您也可以使用 Bigtable 教學課程或快速入門中的程式碼範例,做為程式碼的起點。
請確認您產生的程式碼使用 google cloud libraries-bom
26.14.0 以上版本。
連接器詳細資料
Bigtable Beam 連接器方法 BigtableIO.readChangeStream
可讓您讀取資料變更記錄 (ChangeStreamMutation
) 串流,並進行處理。Bigtable Beam 連接器是 Apache Beam GitHub 存放區的元件。如需連接器程式碼的說明,請參閱 BigtableIO.java
中的註解。
您必須搭配 Beam 2.48.0 以上版本使用這個連接器。請查看 Apache Beam 執行階段支援,確認您使用的是支援的 Java 版本。接著,您可以部署使用連接器的管道至 Dataflow,由該服務處理資源的佈建和管理作業,並協助提升串流資料處理作業的擴充性和可靠性。
如要進一步瞭解 Apache Beam 程式設計模型,請參閱 Beam 說明文件。
沒有活動時間的分組資料
使用 Bigtable Beam 連接器串流的資料變更記錄,與依據事件時間的 Dataflow 函式不相容。
如「複製和水位線」一文所述,如果分區的複製作業尚未趕上執行個體的其餘部分,低水位線可能不會前進。如果低水位線停止前進,變更串流可能會停滯。
為避免串流停滯,Bigtable Beam 連接器會輸出所有資料,輸出時間戳記為零。零時間戳記會導致 Dataflow 將所有資料變更記錄視為延遲資料。因此,依賴事件時間的 Dataflow 功能與 Bigtable 變更串流不相容。具體來說,您無法使用視窗函式、事件時間觸發條件或事件時間計時器。
您可以改用 GlobalWindows 和非事件時間觸發條件,將這些延遲資料分組到窗格中,如教學課程範例所示。如要瞭解觸發條件和窗格的詳細資料,請參閱 Beam 程式設計指南中的「觸發條件」。
自動調度資源
這個連接器支援 Dataflow 自動調度資源,使用 Runner v2 (必要) 時,這項功能預設為啟用。Dataflow 自動調度資源演算法會考量預估的變更串流待處理項目,您可以在 Backlog
區段的「Dataflow 監控」頁面監控這項資訊。部署工作時,請使用 --maxNumWorkers
標記來限制工作站數量。
如要手動調度管道資源,而非使用自動調度功能,請參閱「手動調度串流管道資源」。
限制
使用 Bigtable Beam 連接器搭配 Dataflow 前,請注意下列限制。
Dataflow Runner V2
連接器只能使用 Dataflow Runner v2 執行。如要啟用這項功能,請在指令列引數中指定 --experiments=use_runner_v2
。使用 Runner v1 執行管道時,管道會失敗並出現下列例外狀況:
java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow
快照
連接器不支援 Dataflow 快照。
重複項目
Bigtable Beam 連接器會依提交時間戳記順序,串流處理每個資料列鍵和每個叢集的變更,但由於有時會從串流中較早的時間重新啟動,因此可能會產生重複項目。
重新啟動管道
如果 Dataflow 管道已停止運作一段時間,資料變更記錄可能會超出保留期限。管道恢復後,Bigtable 會讓管道失敗,以便您使用保留期限內的新要求開始時間,啟動新管道。Bigtable 會這麼做,而不是默默地將原始管道的要求時間提前,以免時間戳記超出指定保留期限的資料變更記錄遭到意外捨棄。
事前準備
使用連接器前,請先完成下列必要條件。
設定驗證方法
如要在本機開發環境中使用本頁的 Java 範例,請安裝並初始化 gcloud CLI,然後使用使用者憑證設定應用程式預設憑證。
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
詳情請參閱 Set up authentication for a local development environment。
如要瞭解如何設定正式版環境的驗證作業,請參閱 Set up Application Default Credentials for code running on Google Cloud。
啟用變更串流
您必須先啟用資料表的變更串流,才能讀取資料表。您也可以建立新資料表,並啟用變更串流。
變更串流中繼資料表
使用 Dataflow 串流變更時,Bigtable Beam 連接器會建立名為 __change_stream_md_table
的中繼資料表 (預設名稱)。變更串流中繼資料表會管理連接器的運作狀態,並儲存資料變更記錄的中繼資料。
根據預設,連接器會在與串流資料表相同的執行個體中建立資料表。為確保資料表正常運作,中繼資料表的應用程式設定檔必須使用單叢集轉送,並啟用單一資料列交易。
如要進一步瞭解如何使用 Bigtable Beam 連接器,以串流方式傳輸 Bigtable 的變更,請參閱 BigtableIO 說明文件。
必要的角色
如要取得使用 Dataflow 讀取 Bigtable 變更串流所需的權限,請要求管理員授予下列 IAM 角色。
如要從 Bigtable 讀取變更,您需要具備下列角色:
- Bigtable 系統管理員 (roles/bigtable.admin) 在包含您要串流變更的資料表的 Bigtable 執行個體上
如要執行 Dataflow 工作,您需要下列角色:
- 專案的 Dataflow 開發人員 (
roles/dataflow.developer
),其中包含您的 Cloud 資源 - 專案中的 Dataflow 工作者 (roles/dataflow.worker),該專案包含 Cloud 資源
- 您打算使用的 Cloud Storage 值區的 Storage 物件管理員 (roles/storage.objectAdmin)
如要進一步瞭解如何授予角色,請參閱管理存取權。
將 Bigtable Beam 連接器新增為依附元件
在 Maven pom.xml 檔案中加入類似下列依附元件的程式碼。版本必須為 2.48.0 以上。
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>VERSION</version>
</dependency>
</dependencies>
讀取變更串流
如要建構 Dataflow pipeline 來讀取資料變更記錄,請設定連接器,然後新增轉換和接收器。然後使用連接器在 Beam 管道中讀取 ChangeStreamMutation
物件。
本節中的 Java 程式碼範例說明如何建構管道,並使用管道將鍵/值配對轉換為字串。每個配對都包含資料列鍵和 ChangeStreamMutation
物件。管道會將每個物件的項目轉換為以半形逗號分隔的字串。
建立管道
這個 Java 程式碼範例示範如何建構管道:
處理資料變更記錄
這個範例說明如何逐一查看資料列的資料變更記錄中的所有項目,並根據項目類型呼叫轉換為字串的方法。
如需資料變更記錄可包含的項目類型清單,請參閱「資料變更記錄的內容」。
在本範例中,系統會轉換 write 項目:
在本範例中,系統會轉換「刪除儲存格」項目:
在本範例中,系統會轉換「刪除資料欄系列」項目:
監控
在 Google Cloud 主控台中,您可以透過下列資源監控Google Cloud 資源,同時執行 Dataflow 管道來讀取 Bigtable 變更串流:
請特別查看下列指標:
- 在 Bigtable 系統洞察頁面中,查看下列指標:
- 指標
cpu_load_by_app_profile_by_method_by_table
中的「變更串流的 CPU 使用率」資料。顯示變更串流對叢集 CPU 使用率的影響。 - 變更串流儲存空間使用率 (位元組)
(
change_stream_log_used_bytes
)。
- 指標
在 Dataflow 監控頁面中,檢查資料新鮮度。這項指標會顯示目前時間與浮水印時間的差異 (約為兩分鐘),偶爾會出現尖峰,時間長度會多出一到兩分鐘。資料更新間隔不會指出資料變更記錄的處理速度是否緩慢。為確保重要應用程式的健康狀態和效能持續良好,請監控 Dataflow 資料即時性指標,並採取下列行動:
- 如果資料更新間隔指標持續高於門檻,管道可能資源不足。建議您新增更多 Dataflow 工作站。
- 如果 Dataflow 工作人員的資源充足,但資料新鮮度持續降低或一直偏低,請與Google Cloud 支援團隊聯絡。
Dataflow
processing_delay_from_commit_timestamp_MEAN
指標可顯示工作生命週期內資料變更記錄的平均處理時間。
監控讀取 Bigtable 變更串流的 Dataflow 管道時,Bigtable server/latencies
指標並不實用,因為該指標反映的是串流要求持續時間,而非資料變更記錄處理延遲時間。變更串流中的高延遲時間並不代表要求處理速度緩慢,而是表示連線已開啟這麼久。