Dataflow 管理的 I/O

透過代管 I/O,Dataflow 可管理 Apache Beam 管道中使用的特定 I/O 連接器。透過代管 I/O,您可以輕鬆管理與支援來源和接收器整合的管道。

受管理 I/O 由兩個相互搭配的元件組成:

  • Apache Beam 轉換,提供用於建立 I/O 連接器 (來源和接收器) 的通用 API。

  • Dataflow 服務會代您管理這些 I/O 連接器,包括獨立升級連接器 (與 Apache Beam 版本無關) 的功能。

受管理 I/O 的優點包括:

  • 自動升級:Dataflow 會自動升級管道中的受管理 I/O 連接器。也就是說,您的管道會收到這些連接器的安全性修正、效能提升和錯誤修正,無須變更任何程式碼。詳情請參閱「自動升級」。

  • 一致的 API。在 Apache Beam 中,I/O 連接器通常有不同的 API,且每個連接器的設定方式都不一樣。受管理 I/O 提供單一設定 API,可使用鍵/值屬性,因此管道程式碼更簡單且一致。詳情請參閱 Configuration API

需求條件

  • 下列 SDK 支援受管理 I/O:

    • Java 適用的 Apache Beam SDK 2.58.0 以上版本。
    • Python 適用的 Apache Beam SDK 2.61.0 以上版本。
  • 後端服務需要 Dataflow Runner v2。如果未啟用 Runner v2,管道仍會執行,但無法享有受管理 I/O 服務的好處。

自動升級

使用代管 I/O 連接器的 Dataflow 管道會自動使用最新可靠的連接器版本,如下所示:

  • 提交工作時,Dataflow 會使用經過測試且運作良好的最新版連接器。

  • 如果是串流工作,Dataflow 會在您啟動替代工作時檢查更新,並自動使用最新已知良好版本。即使您未變更任何程式碼,Dataflow 仍會執行這項檢查。

您不必擔心手動更新連接器或管道的 Apache Beam 版本。

下圖顯示升級程序。使用者使用 SDK 版本 X 建立 Apache Beam 管道。使用者提交工作時,Dataflow 會檢查 Managed I/O 的版本,並升級至 Y 版。

圖表:顯示 Managed I/O 升級程序。

升級程序會使工作啟動時間增加約兩分鐘。如要查看受管理 I/O 作業的狀態,請尋找包含「Managed Transform(s)」字串的記錄項目

設定 API

受管理 I/O 是 Apache Beam 的一站式轉換,提供一致的 API 來設定來源和接收器。

Java

如要建立 Managed I/O 支援的任何來源或接收器,請使用 Managed 類別。指定要例項化的來源或接收器,並傳遞一組設定參數,類似於下列項目:

Map config = ImmutableMap.<String, Object>builder()
    .put("config1", "abc")
    .put("config2", 1);

pipeline.apply(Managed.read(/*Which source to read*/).withConfig(config))
    .getSinglePCollection();

您也可以將設定參數做為 YAML 檔案傳送。如需完整程式碼範例,請參閱「從 Apache Iceberg 讀取資料」。

Python

匯入 apache_beam.transforms.managed 模組,並呼叫 managed.Readmanaged.Write 方法。指定要例項化的來源或接收器,並傳遞一組設定參數,類似於下列項目:

pipeline
| beam.managed.Read(
    beam.managed.SOURCE, # Example: beam.managed.KAFKA
    config={
      "config1": "abc",
      "config2": 1
    }
)

您也可以將設定參數做為 YAML 檔案傳送。如需完整程式碼範例,請參閱「從 Apache Kafka 讀取資料」。

動態目的地

對於某些接收器,受管理 I/O 連接器可以根據傳入記錄中的欄位值,動態選取目的地。

如要使用動態到達網頁,請提供到達網頁的範本字串。範本字串可在大括號內包含欄位名稱,例如 "tables.{field1}"。在執行階段,連接器會為每筆傳入的記錄替換欄位值,以判斷該記錄的目的地。

舉例來說,假設您的資料有名為 airport 的欄位。您可以將目的地設為 "flights.{airport}"。如果 airport=SFO,系統會將記錄寫入 flights.SFO。如為巢狀欄位,請使用點標記法。例如: {top.middle.nested}

如需示範如何使用動態目的地的程式碼範例,請參閱「使用動態目的地寫入」。

篩選

您可能想在特定欄位寫入目的地資料表前將其篩除。對於支援動態目的地的接收器,您可以使用 dropkeeponly 參數達成此目的。這些參數可讓您在輸入記錄中加入目的地中繼資料,而不必將中繼資料寫入目的地。

每個接收器最多只能設定一個這類參數。

設定參數 資料類型 說明
drop 字串清單 要捨棄的欄位名稱清單,然後再寫入目的地。
keep 字串清單 寫入目的地時要保留的欄位名稱清單。其他欄位會遭到捨棄。
only 字串 要用做頂層記錄的欄位名稱 (只能有一個),寫入目的地時會用到。 系統會捨棄其他所有欄位。這個欄位必須是列類型。

支援的來源和接收器

受管理 I/O 支援下列來源和接收器。