受管理 I/O 支援讀取及寫入 Apache Kafka。
需求條件
下列 SDK 支援 Apache Kafka 的受管理 I/O:
- Java 適用的 Apache Beam SDK 2.58.0 以上版本
- Python 適用的 Apache Beam SDK 2.61.0 以上版本
設定
Managed I/O 會使用下列 Apache Kafka 設定參數。
讀取及寫入設定 | 資料類型 | 說明 |
---|---|---|
bootstrap_servers |
字串 | 必要。以半形逗號分隔的 Kafka 啟動伺服器清單。
範例:localhost:9092 。 |
topic |
字串 | 必要。要讀取或寫入的 Kafka 主題。 |
file_descriptor_path |
字串 | 通訊協定緩衝區檔案描述元集的路徑。僅在 data_format 為 "PROTO" 時適用。 |
data_format |
字串 | 訊息格式。支援的值:"AVRO" 、"JSON" 、"PROTO" 、"RAW" 。預設值為 "RAW" ,可讀取或寫入訊息酬載的原始位元組。 |
message_name |
字串 | 通訊協定緩衝區訊息的名稱。如果 data_format 為 "PROTO" ,則為必要欄位。 |
schema |
字串 | Kafka 訊息結構定義。預期的結構定義類型取決於資料格式:
如果是讀取管道,如果已設定 |
讀取設定 | 資料類型 | 說明 |
auto_offset_reset_config |
字串 | 指定在 Kafka 伺服器上沒有初始偏移或目前偏移不再存在時的行為。支援的值如下:
預設值為 |
confluent_schema_registry_subject |
字串 | Confluent 結構定義登錄的主體。如果指定 confluent_schema_registry_url ,則為必要欄位。 |
confluent_schema_registry_url |
字串 | Confluent 結構定義登錄的網址。如果指定,系統會忽略 schema 參數。 |
consumer_config_updates |
地圖 | 設定 Kafka 消費者的設定參數。詳情請參閱 Kafka 說明文件中的「 消費者設定」。您可以使用這個參數自訂 Kafka 消費者。 |
max_read_time_seconds |
整數 | 讀取時間上限 (以秒為單位)。這個選項會產生有界限的 PCollection ,主要用於測試或其他非生產情境。 |
撰寫設定 | 資料類型 | 說明 |
producer_config_updates |
地圖 | 設定 Kafka 製作工具的設定參數。詳情請參閱 Kafka 說明文件中的「 Producer configs」。您可以使用這個參數自訂 Kafka 產生器。 |
如要讀取 Avro 或 JSON 訊息,您必須指定訊息結構定義。如要直接設定結構定義,請使用 schema
參數。如要透過 Confluent 結構定義登錄檔提供結構定義,請設定 confluent_schema_registry_url
和 confluent_schema_registry_subject
參數。
如要讀取或寫入通訊協定緩衝區訊息,請指定訊息結構定義或設定 file_descriptor_path
參數。
如需更多資訊和程式碼範例,請參閱下列主題: