本页面介绍了如何在 Dataflow 流水线中使用 Google Cloud Managed Service for Apache Kafka 作为来源或接收器。
您可以使用以下任一方法:
要求
在项目中启用 Cloud Storage、Dataflow 和 Managed Service for Apache Kafka API。请参阅启用 API 或运行以下 Google Cloud CLI 命令:
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
Dataflow 工作器服务账号必须具有 Managed Kafka Client (
roles/managedkafka.client
) Identity and Access Management (IAM) 角色。Dataflow 工作器虚拟机必须具有对 Kafka 引导服务器的网络访问权限。如需了解详情,请参阅配置 Managed Service for Apache Kafka 网络。
获取引导服务器地址
如需运行连接到 Managed Service for Apache Kafka 集群的流水线,请先获取集群的引导服务器地址。您需要在配置流水线时使用此地址。
您可以使用 Google Cloud 控制台或 Google Cloud CLI,如下所示:
控制台
在 Google Cloud 控制台中,前往集群页面。
点击集群名称。
点击配置标签页。
从引导网址复制引导服务器地址。
gcloud
使用 managed-kafka clusters describe
命令。
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
替换以下内容:
- CLUSTER_ID:集群的 ID 或名称
- LOCATION:集群的位置
如需了解详情,请参阅查看 Managed Service for Apache Kafka 集群。
将 Managed Service for Apache Kafka 与 Dataflow 模板搭配使用
Google 提供了多个可从 Apache Kafka 读取数据的 Dataflow 模板:
这些模板可与 Managed Service for Apache Kafka 搭配使用。如果其中一个符合您的使用情形,请考虑使用该组件,而不是编写自定义流水线代码。
控制台
前往 Dataflow > 作业页面。
点击基于模板创建作业。
在作业名称中,输入作业的名称。
从 Dataflow 模板下拉菜单中,选择要运行的模板。
在 Kafka 引导服务器框中,输入引导服务器地址。
在 Kafka 主题框中,输入主题的名称。
对于 Kafka 身份验证模式,请选择 APPLICATION_DEFAULT_CREDENTIALS。
对于 Kafka 消息格式,请选择 Apache Kafka 消息的格式。
根据需要输入其他参数。每个模板的支持参数都有相关文档。
运行作业。
gcloud
使用 gcloud dataflow jobs run
命令。
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
替换以下内容:
- JOB_NAME:作业的名称
- TEMPLATE_FILE:Cloud Storage 中模板文件的位置
- REGION_NAME:您要部署作业的区域
- PROJECT_NAME:您的 Google Cloud 项目的名称
- LOCATION:集群的位置
- CLUSTER_ID:集群的 ID 或名称
- TOPIC:Kafka 主题的名称
将 Managed Service for Apache Kafka 与 Beam 流水线搭配使用
本部分介绍如何使用 Apache Beam SDK 创建和运行连接到 Managed Service for Apache Kafka 的 Dataflow 流水线。
在大多数情况下,请使用托管式 I/O 转换作为 Kafka 源或接收器。如果您需要更高级的性能调优,不妨考虑使用 KafkaIO
连接器。如需详细了解使用托管式 I/O 的优势,请参阅 Dataflow 托管式 I/O。
要求
Kafka 客户端版本 3.6.0 或更高版本。
Apache Beam SDK 2.61.0 版或更高版本。
您启动 Dataflow 作业的机器必须具有对 Apache Kafka 引导服务器的网络访问权限。例如,从可以访问集群所在 VPC 的 Compute Engine 实例启动作业。
创建作业的主账号必须具有以下 IAM 角色:
- Managed Kafka Client (
roles/managedkafka.client
) 来访问 Apache Kafka 集群。 - Service Account User (
roles/iam.serviceAccountUser
),用于充当 Dataflow Worker 服务账号。 - Storage Admin (
roles/storage.admin
) 角色,用于将作业文件上传到 Cloud Storage。 - Dataflow Admin (
roles/dataflow.admin
) 角色才能创建作业。
如果您从 Compute Engine 实例启动作业,则可以向关联到虚拟机的服务账号授予这些角色。如需了解详情,请参阅创建使用用户管理的服务账号的虚拟机。
您还可以在创建作业时,将应用默认凭据 (ADC) 与服务账号模拟搭配使用。
- Managed Kafka Client (
配置托管式 I/O
如果您的流水线使用 Managed I/O for Apache Kafka,请设置以下配置选项以通过 Managed Service for Apache Kafka 进行身份验证:
"security.protocol"
:"SASL_SSL"
"sasl.mechanism"
:"OAUTHBEARER"
"sasl.login.callback.handler.class"
:"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
"sasl.jaas.config"
:"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
以下示例展示了如何为 Managed Service for Apache Kafka 配置受管 I/O:
Java
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
配置 KafkaIO
连接器
以下示例展示了如何为 Managed Service for Apache Kafka 配置 KafkaIO
连接器:
Java
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
后续步骤
- 详细了解 Managed Service for Apache Kafka。
- 将数据从 Managed Service for Apache Kafka 写入 BigQuery。
- 从 Apache Kafka 读取到 Dataflow。
- 从 Dataflow 写入 Apache Kafka。