適用於 Apache Kafka 的 Dataflow 代管 I/O

受管理 I/O 支援讀取及寫入 Apache Kafka。

需求條件

下列 SDK 支援 Apache Kafka 的受管理 I/O:

  • Java 適用的 Apache Beam SDK 2.58.0 以上版本
  • Python 適用的 Apache Beam SDK 2.61.0 以上版本

設定

Managed I/O 會使用下列 Apache Kafka 設定參數。

讀取及寫入設定 資料類型 說明
bootstrap_servers 字串 必要。以半形逗號分隔的 Kafka 啟動伺服器清單。 範例:localhost:9092
topic 字串 必要。要讀取或寫入的 Kafka 主題。
file_descriptor_path 字串 通訊協定緩衝區檔案描述元集的路徑。僅在 data_format"PROTO" 時適用。
data_format 字串 訊息格式。支援的值:"AVRO""JSON""PROTO""RAW"。預設值為 "RAW",可讀取或寫入訊息酬載的原始位元組。
message_name 字串 通訊協定緩衝區訊息的名稱。如果 data_format"PROTO",則為必要欄位。
schema 字串

Kafka 訊息結構定義。預期的結構定義類型取決於資料格式:

如果是讀取管道,如果已設定 confluent_schema_registry_url,系統會忽略這個參數。

讀取設定 資料類型 說明
auto_offset_reset_config 字串

指定在 Kafka 伺服器上沒有初始偏移或目前偏移不再存在時的行為。支援的值如下:

  • "earliest":將位移重設為最早的位移。
  • "latest":將位移重設為最新位移。

預設值為 "latest"

confluent_schema_registry_subject 字串 Confluent 結構定義登錄的主體。如果指定 confluent_schema_registry_url,則為必要欄位。
confluent_schema_registry_url 字串 Confluent 結構定義登錄的網址。如果指定,系統會忽略 schema 參數。
consumer_config_updates 地圖 設定 Kafka 消費者的設定參數。詳情請參閱 Kafka 說明文件中的「 消費者設定」。您可以使用這個參數自訂 Kafka 消費者。
max_read_time_seconds 整數 讀取時間上限 (以秒為單位)。這個選項會產生有界限的 PCollection,主要用於測試或其他非生產情境。
撰寫設定 資料類型 說明
producer_config_updates 地圖 設定 Kafka 製作工具的設定參數。詳情請參閱 Kafka 說明文件中的「 Producer configs」。您可以使用這個參數自訂 Kafka 產生器。

如要讀取 Avro 或 JSON 訊息,您必須指定訊息結構定義。如要直接設定結構定義,請使用 schema 參數。如要透過 Confluent 結構定義登錄檔提供結構定義,請設定 confluent_schema_registry_urlconfluent_schema_registry_subject 參數。

如要讀取或寫入通訊協定緩衝區訊息,請指定訊息結構定義或設定 file_descriptor_path 參數。

如需更多資訊和程式碼範例,請參閱下列主題: