本頁面說明如何在 Dataflow 管道中,使用 Google Cloud Managed Service for Apache Kafka 做為來源或接收器。
您可以採用下列任一做法:
需求條件
在專案中啟用 Cloud Storage、Dataflow 和 Managed Service for Apache Kafka API。請參閱「啟用 API」一文,或執行下列 Google Cloud CLI 指令:
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
Dataflow工作站服務帳戶必須具備代管 Kafka 用戶端 (
roles/managedkafka.client
) Identity and Access Management (IAM) 角色。Dataflow 工作站 VM 必須具備 Kafka 啟動伺服器的網路存取權。詳情請參閱「設定 Managed Service for Apache Kafka 網路」。
取得啟動伺服器位址
如要執行連結至 Managed Service for Apache Kafka 叢集的管道,請先取得叢集的啟動伺服器位址。設定管道時需要這個地址。
您可以使用 Google Cloud 控制台或 Google Cloud CLI,如下所示:
控制台
前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。
按一下叢集名稱。
按一下 [設定] 分頁標籤。
從「啟動網址」複製啟動伺服器位址。
gcloud
使用 managed-kafka clusters describe
指令。
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
更改下列內容:
- CLUSTER_ID:叢集的 ID 或名稱
- LOCATION:叢集位置
詳情請參閱「查看 Managed Service for Apache Kafka 叢集」。
搭配 Dataflow 範本使用 Managed Service for Apache Kafka
Google 提供多個可從 Apache Kafka 讀取資料的 Dataflow 範本:
這些範本可用於 Managed Service for Apache Kafka。如果其中一個符合您的用途,建議使用該函式,不要自行編寫自訂管道程式碼。
控制台
前往「Dataflow」>「Jobs」(工作) 頁面。
按一下 [Create job from template] (利用範本建立工作)。
在「工作名稱」中,輸入工作名稱。
從「Dataflow」範本下拉式選單中,選取要執行的範本。
在「Kafka bootstrap server」(Kafka 啟動伺服器) 方塊中,輸入啟動伺服器位址。
在「Kafka topic」(Kafka 主題) 方塊中,輸入主題名稱。
在「Kafka authentication mode」(Kafka 驗證模式) 部分,選取「APPLICATION_DEFAULT_CREDENTIALS」。
在「Kafka message format」(Kafka 訊息格式) 部分,選取 Apache Kafka 訊息的格式。
視需要輸入其他參數。每個範本支援的參數都有記錄。
執行工作。
gcloud
使用 gcloud dataflow jobs run
指令。
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
更改下列內容:
- JOB_NAME:工作名稱
- TEMPLATE_FILE:範本檔案在 Cloud Storage 中的位置
- REGION_NAME:要部署工作的區域
- PROJECT_NAME:您的 Google Cloud 專案名稱
- LOCATION:叢集位置
- CLUSTER_ID:叢集的 ID 或名稱
- TOPIC:Kafka 主題的名稱
搭配 Beam 管道使用 Managed Service for Apache Kafka
本節說明如何使用 Apache Beam SDK 建立及執行 Dataflow 管道,並連線至 Managed Service for Apache Kafka。
在大多數情況下,請使用受管理 I/O 轉換做為 Kafka 來源或接收器。如要進一步調整效能,請考慮使用 KafkaIO
連接器。如要進一步瞭解使用受管理 I/O 的優點,請參閱「Dataflow 受管理 I/O」一文。
需求條件
Kafka 用戶端 3.6.0 以上版本。
Apache Beam SDK 2.61.0 以上版本。
啟動 Dataflow 工作時,您必須透過網路存取 Apache Kafka 啟動伺服器。舉例來說,您可以從可存取叢集所在虛擬私有雲的 Compute Engine 執行個體啟動作業。
建立工作的主體必須具備下列 IAM 角色:
- Managed Kafka Client (
roles/managedkafka.client
) 存取 Apache Kafka 叢集。 - 服務帳戶使用者 (
roles/iam.serviceAccountUser
) 角色,以 Dataflow 工作站服務帳戶身分執行作業。 - Storage 管理員 (
roles/storage.admin
) 將作業檔案上傳至 Cloud Storage。 - Dataflow 管理員 (
roles/dataflow.admin
) 建立工作。
如果您從 Compute Engine 執行個體啟動工作,可以將這些角色授予附加至 VM 的服務帳戶。詳情請參閱「建立使用使用者管理服務帳戶的 VM」。
建立作業時,您也可以搭配服務帳戶模擬使用應用程式預設憑證 (ADC)。
- Managed Kafka Client (
設定受管理 I/O
如果管道使用 Managed I/O for Apache Kafka,請設定下列設定選項,向 Managed Service for Apache Kafka 進行驗證:
"security.protocol"
:"SASL_SSL"
"sasl.mechanism"
:"OAUTHBEARER"
"sasl.login.callback.handler.class"
:"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
"sasl.jaas.config"
:"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
以下範例說明如何為 Managed Service for Apache Kafka 設定代管 I/O:
Java
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
設定 KafkaIO
連接器
以下範例說明如何為 Managed Service for Apache Kafka 設定 KafkaIO
連接器:
Java
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
後續步驟
- 進一步瞭解 Managed Service for Apache Kafka。
- 將資料從 Managed Service for Apache Kafka 寫入 BigQuery。
- 從 Apache Kafka 讀取資料到 Dataflow。
- 從 Dataflow 寫入 Apache Kafka。