将 Dataflow 与 Managed Service for Apache Kafka 搭配使用

本页面介绍了如何在 Dataflow 流水线中使用 Google Cloud Managed Service for Apache Kafka 作为来源或接收器。

您可以使用以下任一方法:

要求

  • 在项目中启用 Cloud Storage、Dataflow 和 Managed Service for Apache Kafka API。请参阅启用 API 或运行以下 Google Cloud CLI 命令:

    gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
    
  • Dataflow 工作器服务账号必须具有 Managed Kafka Client (roles/managedkafka.client) Identity and Access Management (IAM) 角色。

  • Dataflow 工作器虚拟机必须具有对 Kafka 引导服务器的网络访问权限。如需了解详情,请参阅配置 Managed Service for Apache Kafka 网络

获取引导服务器地址

如需运行连接到 Managed Service for Apache Kafka 集群的流水线,请先获取集群的引导服务器地址。您需要在配置流水线时使用此地址。

您可以使用 Google Cloud 控制台或 Google Cloud CLI,如下所示:

控制台

  1. 在 Google Cloud 控制台中,前往集群页面。

    转到集群

  2. 点击集群名称。

  3. 点击配置标签页。

  4. 引导网址复制引导服务器地址。

gcloud

使用 managed-kafka clusters describe 命令。

gcloud managed-kafka clusters describe CLUSTER_ID \
  --location=LOCATION \
  --format="value(bootstrapAddress)"

替换以下内容:

  • CLUSTER_ID:集群的 ID 或名称
  • LOCATION:集群的位置

如需了解详情,请参阅查看 Managed Service for Apache Kafka 集群

将 Managed Service for Apache Kafka 与 Dataflow 模板搭配使用

Google 提供了多个可从 Apache Kafka 读取数据的 Dataflow 模板:

这些模板可与 Managed Service for Apache Kafka 搭配使用。如果其中一个符合您的使用情形,请考虑使用该组件,而不是编写自定义流水线代码。

控制台

  1. 前往 Dataflow > 作业页面。

    转到作业

  2. 点击基于模板创建作业

  3. 作业名称中,输入作业的名称。

  4. Dataflow 模板下拉菜单中,选择要运行的模板。

  5. Kafka 引导服务器框中,输入引导服务器地址。

  6. Kafka 主题框中,输入主题的名称。

  7. 对于 Kafka 身份验证模式,请选择 APPLICATION_DEFAULT_CREDENTIALS

  8. 对于 Kafka 消息格式,请选择 Apache Kafka 消息的格式。

  9. 根据需要输入其他参数。每个模板的支持参数都有相关文档。

  10. 运行作业

gcloud

使用 gcloud dataflow jobs run 命令。

gcloud dataflow jobs run JOB_NAME \
  --gcs-location gs://TEMPLATE_FILE \
  --region REGION_NAME \
  --parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...

替换以下内容:

  • JOB_NAME:作业的名称
  • TEMPLATE_FILE:Cloud Storage 中模板文件的位置
  • REGION_NAME:您要部署作业的区域
  • PROJECT_NAME:您的 Google Cloud 项目的名称
  • LOCATION:集群的位置
  • CLUSTER_ID:集群的 ID 或名称
  • TOPIC:Kafka 主题的名称

将 Managed Service for Apache Kafka 与 Beam 流水线搭配使用

本部分介绍如何使用 Apache Beam SDK 创建和运行连接到 Managed Service for Apache Kafka 的 Dataflow 流水线。

在大多数情况下,请使用托管式 I/O 转换作为 Kafka 源或接收器。如果您需要更高级的性能调优,不妨考虑使用 KafkaIO 连接器。如需详细了解使用托管式 I/O 的优势,请参阅 Dataflow 托管式 I/O

要求

  • Kafka 客户端版本 3.6.0 或更高版本。

  • Apache Beam SDK 2.61.0 版或更高版本。

  • 您启动 Dataflow 作业的机器必须具有对 Apache Kafka 引导服务器的网络访问权限。例如,从可以访问集群所在 VPC 的 Compute Engine 实例启动作业。

  • 创建作业的主账号必须具有以下 IAM 角色:

    • Managed Kafka Client (roles/managedkafka.client) 来访问 Apache Kafka 集群。
    • Service Account User (roles/iam.serviceAccountUser),用于充当 Dataflow Worker 服务账号。
    • Storage Admin (roles/storage.admin) 角色,用于将作业文件上传到 Cloud Storage。
    • Dataflow Admin (roles/dataflow.admin) 角色才能创建作业。

    如果您从 Compute Engine 实例启动作业,则可以向关联到虚拟机的服务账号授予这些角色。如需了解详情,请参阅创建使用用户管理的服务账号的虚拟机

    您还可以在创建作业时,将应用默认凭据 (ADC) 与服务账号模拟搭配使用。

配置托管式 I/O

如果您的流水线使用 Managed I/O for Apache Kafka,请设置以下配置选项以通过 Managed Service for Apache Kafka 进行身份验证:

  • "security.protocol""SASL_SSL"
  • "sasl.mechanism""OAUTHBEARER"
  • "sasl.login.callback.handler.class""com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
  • "sasl.jaas.config""org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"

以下示例展示了如何为 Managed Service for Apache Kafka 配置受管 I/O:

Java

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
      .put("bootstrap_servers", options.getBootstrapServer())
      .put("topic", options.getTopic())
      .put("data_format", "RAW")
      // Set the following fields to authenticate with Application Default
      // Credentials (ADC):
      .put("security.protocol", "SASL_SSL")
      .put("sasl.mechanism", "OAUTHBEARER")
      .put("sasl.login.callback.handler.class",
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
      .put("sasl.jaas.config",   "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
      .build();

Python

pipeline
| beam.managed.Read(
    beam.managed.KAFKA,
    config={
      "bootstrap_servers": options.bootstrap_server,
      "topic": options.topic,
      "data_format": "RAW",
      # Set the following fields to authenticate with Application Default
      # Credentials (ADC):
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "OAUTHBEARER",
      "sasl.login.callback.handler.class":
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
      "sasl.jaas.config":
          "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
    }
)

配置 KafkaIO 连接器

以下示例展示了如何为 Managed Service for Apache Kafka 配置 KafkaIO 连接器:

Java

String bootstap = options.getBootstrap();
String topicName = options.getTopic();

// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
    .withBootstrapServers(bootstap)
    .withTopic(topicName)
    .withKeyDeserializer(IntegerSerializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGCPApplicationDefaultCredentials())

// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
    .withBootstrapServers(bootstrap)
    .withTopic(topicName)
    .withKeySerializer(IntegerSerializer.class)
    .withValueSerializer(StringSerializer.class)
    .withGCPApplicationDefaultCredentials());

Python

WriteToKafka(
  producer_config={
    "bootstrap.servers": options.bootstrap_servers,
    "security.protocol": 'SASL_SSL',
    "sasl.mechanism": "OAUTHBEARER",
    "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
    "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
  },
  topic=options.topic,
  key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
  value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)

后续步骤