搭配使用 Dataflow 與 Managed Service for Apache Kafka

本頁面說明如何在 Dataflow 管道中,使用 Google Cloud Managed Service for Apache Kafka 做為來源或接收器。

您可以採用下列任一做法:

需求條件

  • 在專案中啟用 Cloud Storage、Dataflow 和 Managed Service for Apache Kafka API。請參閱「啟用 API」一文,或執行下列 Google Cloud CLI 指令:

    gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
    
  • Dataflow工作站服務帳戶必須具備代管 Kafka 用戶端 (roles/managedkafka.client) Identity and Access Management (IAM) 角色。

  • Dataflow 工作站 VM 必須具備 Kafka 啟動伺服器的網路存取權。詳情請參閱「設定 Managed Service for Apache Kafka 網路」。

取得啟動伺服器位址

如要執行連結至 Managed Service for Apache Kafka 叢集的管道,請先取得叢集的啟動伺服器位址。設定管道時需要這個地址。

您可以使用 Google Cloud 控制台或 Google Cloud CLI,如下所示:

控制台

  1. 前往 Google Cloud 控制台的「Clusters」(叢集) 頁面。

    前往「Clusters」(叢集)

  2. 按一下叢集名稱。

  3. 按一下 [設定] 分頁標籤。

  4. 從「啟動網址」複製啟動伺服器位址。

gcloud

使用 managed-kafka clusters describe 指令。

gcloud managed-kafka clusters describe CLUSTER_ID \
  --location=LOCATION \
  --format="value(bootstrapAddress)"

更改下列內容:

  • CLUSTER_ID:叢集的 ID 或名稱
  • LOCATION:叢集位置

詳情請參閱「查看 Managed Service for Apache Kafka 叢集」。

搭配 Dataflow 範本使用 Managed Service for Apache Kafka

Google 提供多個可從 Apache Kafka 讀取資料的 Dataflow 範本:

這些範本可用於 Managed Service for Apache Kafka。如果其中一個符合您的用途,建議使用該函式,不要自行編寫自訂管道程式碼。

控制台

  1. 前往「Dataflow」>「Jobs」(工作) 頁面。

    前往「Jobs」(工作) 頁面

  2. 按一下 [Create job from template] (利用範本建立工作)

  3. 在「工作名稱」中,輸入工作名稱。

  4. 從「Dataflow」範本下拉式選單中,選取要執行的範本。

  5. 在「Kafka bootstrap server」(Kafka 啟動伺服器) 方塊中,輸入啟動伺服器位址。

  6. 在「Kafka topic」(Kafka 主題) 方塊中,輸入主題名稱。

  7. 在「Kafka authentication mode」(Kafka 驗證模式) 部分,選取「APPLICATION_DEFAULT_CREDENTIALS」

  8. 在「Kafka message format」(Kafka 訊息格式) 部分,選取 Apache Kafka 訊息的格式。

  9. 視需要輸入其他參數。每個範本支援的參數都有記錄。

  10. 執行工作

gcloud

使用 gcloud dataflow jobs run 指令。

gcloud dataflow jobs run JOB_NAME \
  --gcs-location gs://TEMPLATE_FILE \
  --region REGION_NAME \
  --parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...

更改下列內容:

  • JOB_NAME:工作名稱
  • TEMPLATE_FILE:範本檔案在 Cloud Storage 中的位置
  • REGION_NAME:要部署工作的區域
  • PROJECT_NAME:您的 Google Cloud 專案名稱
  • LOCATION:叢集位置
  • CLUSTER_ID:叢集的 ID 或名稱
  • TOPIC:Kafka 主題的名稱

搭配 Beam 管道使用 Managed Service for Apache Kafka

本節說明如何使用 Apache Beam SDK 建立及執行 Dataflow 管道,並連線至 Managed Service for Apache Kafka。

在大多數情況下,請使用受管理 I/O 轉換做為 Kafka 來源或接收器。如要進一步調整效能,請考慮使用 KafkaIO 連接器。如要進一步瞭解使用受管理 I/O 的優點,請參閱「Dataflow 受管理 I/O」一文。

需求條件

  • Kafka 用戶端 3.6.0 以上版本。

  • Apache Beam SDK 2.61.0 以上版本。

  • 啟動 Dataflow 工作時,您必須透過網路存取 Apache Kafka 啟動伺服器。舉例來說,您可以從可存取叢集所在虛擬私有雲的 Compute Engine 執行個體啟動作業。

  • 建立工作的主體必須具備下列 IAM 角色:

    • Managed Kafka Client (roles/managedkafka.client) 存取 Apache Kafka 叢集。
    • 服務帳戶使用者 (roles/iam.serviceAccountUser) 角色,以 Dataflow 工作站服務帳戶身分執行作業。
    • Storage 管理員 (roles/storage.admin) 將作業檔案上傳至 Cloud Storage。
    • Dataflow 管理員 (roles/dataflow.admin) 建立工作。

    如果您從 Compute Engine 執行個體啟動工作,可以將這些角色授予附加至 VM 的服務帳戶。詳情請參閱「建立使用使用者管理服務帳戶的 VM」。

    建立作業時,您也可以搭配服務帳戶模擬使用應用程式預設憑證 (ADC)。

設定受管理 I/O

如果管道使用 Managed I/O for Apache Kafka,請設定下列設定選項,向 Managed Service for Apache Kafka 進行驗證:

  • "security.protocol""SASL_SSL"
  • "sasl.mechanism""OAUTHBEARER"
  • "sasl.login.callback.handler.class""com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
  • "sasl.jaas.config""org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"

以下範例說明如何為 Managed Service for Apache Kafka 設定代管 I/O:

Java

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
      .put("bootstrap_servers", options.getBootstrapServer())
      .put("topic", options.getTopic())
      .put("data_format", "RAW")
      // Set the following fields to authenticate with Application Default
      // Credentials (ADC):
      .put("security.protocol", "SASL_SSL")
      .put("sasl.mechanism", "OAUTHBEARER")
      .put("sasl.login.callback.handler.class",
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
      .put("sasl.jaas.config",   "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
      .build();

Python

pipeline
| beam.managed.Read(
    beam.managed.KAFKA,
    config={
      "bootstrap_servers": options.bootstrap_server,
      "topic": options.topic,
      "data_format": "RAW",
      # Set the following fields to authenticate with Application Default
      # Credentials (ADC):
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "OAUTHBEARER",
      "sasl.login.callback.handler.class":
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
      "sasl.jaas.config":
          "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
    }
)

設定 KafkaIO 連接器

以下範例說明如何為 Managed Service for Apache Kafka 設定 KafkaIO 連接器:

Java

String bootstap = options.getBootstrap();
String topicName = options.getTopic();

// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
    .withBootstrapServers(bootstap)
    .withTopic(topicName)
    .withKeyDeserializer(IntegerSerializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGCPApplicationDefaultCredentials())

// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
    .withBootstrapServers(bootstrap)
    .withTopic(topicName)
    .withKeySerializer(IntegerSerializer.class)
    .withValueSerializer(StringSerializer.class)
    .withGCPApplicationDefaultCredentials());

Python

WriteToKafka(
  producer_config={
    "bootstrap.servers": options.bootstrap_servers,
    "security.protocol": 'SASL_SSL',
    "sasl.mechanism": "OAUTHBEARER",
    "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
    "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
  },
  topic=options.topic,
  key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
  value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)

後續步驟