Apache Kafka to Cloud Storage 模板是一种流处理流水线,可从 Google Cloud Managed Service for Apache Kafka 注入文本数据,并将记录输出到 Cloud Storage。
您还可以将 Apache Kafka to BigQuery 模板与自行管理的 Kafka 或外部 Kafka 搭配使用。
流水线要求
- Cloud Storage 输出存储桶必须存在。
- Apache Kafka 代理服务器必须正在运行并可从 Dataflow 工作器机器进行访问。
- Apache Kafka 主题必须已存在。
Kafka 消息格式
此模板支持以如下格式从 Kafka 读取消息:
JSON 格式
如需读取 JSON 消息,请将 messageFormat
模板参数设置为 "JSON"
。
Avro 二进制编码
如需读取二进制 Avro 消息,请设置以下模板参数:
messageFormat
:"AVRO_BINARY_ENCODING"
。binaryAvroSchemaPath
:Cloud Storage 中 Avro 架构文件的位置。示例:gs://BUCKET_NAME/message-schema.avsc
。
如需详细了解 Avro 二进制格式,请参阅 Apache Avro 文档中的二进制编码。
Confluent 架构注册表编码的 Avro
如需读取 Confluent 架构注册表中以 Avro 编码的消息,请设置以下模板参数:
messageFormat
:"AVRO_CONFLUENT_WIRE_FORMAT"
。schemaFormat
:以下值之一:"SINGLE_SCHEMA_FILE"
:消息架构在 Avro 架构文件中定义。 在confluentAvroSchemaPath
参数中指定架构文件的 Cloud Storage 位置。-
"SCHEMA_REGISTRY"
:消息使用 Confluent 架构注册表进行编码。 在schemaRegistryConnectionUrl
参数中指定 Confluent 架构注册表实例的网址,并在schemaRegistryAuthenticationMode
参数中指定身份验证模式。
如需详细了解此格式,请参阅 Confluent 文档中的 序列化格式。
输出文件格式
输出文件格式与输入 Kafka 消息的格式相同。例如,如果您为 Kafka 消息格式选择 JSON,系统会将 JSON 文件写入输出 Cloud Storage 存储桶。
身份验证
Apache Kafka to Cloud Storage 模板支持对 Kafka 代理进行 SASL/PLAIN 身份验证。
模板参数
必需参数
- readBootstrapServerAndTopic:要从中读取输入的 Kafka 主题。
- outputDirectory:用于写入输出文件的路径和文件名前缀。必须以斜杠结尾。 例如
gs://your-bucket/your-path/
。 - kafkaReadAuthenticationMode:与 Kafka 集群搭配使用的身份验证模式。使用
KafkaAuthenticationMethod.NONE
表示不进行身份验证,使用KafkaAuthenticationMethod.SASL_PLAIN
表示使用 SASL/PLAIN 用户名和密码,使用KafkaAuthenticationMethod.SASL_SCRAM_512
表示使用 SASL_SCRAM_512 身份验证,使用KafkaAuthenticationMethod.TLS
表示使用基于证书的身份验证。KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
应仅用于 Google Cloud Apache Kafka for BigQuery 集群,它允许使用应用默认凭据进行身份验证。 - messageFormat:要读取的 Kafka 消息的格式。支持的值包括
AVRO_CONFLUENT_WIRE_FORMAT
(Confluent 架构注册表编码的 Avro)、AVRO_BINARY_ENCODING
(普通二进制 Avro)和JSON
。默认值为:AVRO_CONFLUENT_WIRE_FORMAT。 - useBigQueryDLQ:如果为 true,失败的消息将写入 BigQuery,并附带额外的错误信息。默认值为:false。
可选参数
- windowDuration:将数据写入 Cloud Storage 的窗口时长/大小。允许的格式为 Ns(以秒为单位,例如 5s)、Nm(以分钟为单位,例如 12m)、Nh(以小时为单位,例如 2h)。 例如
5m
。默认值为 5m。 - outputFilenamePrefix:要在各窗口文件上放置的前缀。例如
output-
。默认值:output。 - numShards:写入时生成的输出分片数上限。分片数越多,写入 Cloud Storage 的吞吐量越高,但处理输出 Cloud Storage 文件时跨分片聚合数据的费用也可能更高。默认值由 Dataflow 决定。
- enableCommitOffsets:将已处理消息的偏移量提交到 Kafka。如果启用此参数,则在重启流水线时,消息处理的间隔或重复处理会降到最低。需要指定使用方群组 ID。默认值为:false。
- consumerGroupId:此流水线所属的使用方群组的唯一标识符。如果已启用“将偏移量提交到 Kafka”,则必须使用此参数。默认值为空。
- kafkaReadOffset:在没有提交偏移量的情况下读取消息的起点。最早的从最开始算起,最新的从最新消息算起。默认值为:latest。
- kafkaReadUsernameSecretId:Google Cloud Secret Manager Secret ID,其中包含要与
SASL_PLAIN
身份验证搭配使用的 Kafka 用户名。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。默认值为空。 - kafkaReadPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含要与
SASL_PLAIN
身份验证搭配使用的 Kafka 密码。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。默认值为空。 - kafkaReadKeystoreLocation:Java KeyStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含在向 Kafka 集群进行身份验证时使用的 TLS 证书和私钥。例如
gs://your-bucket/keystore.jks
。 - kafkaReadTruststoreLocation:Java TrustStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含用于验证 Kafka 代理身份的受信任证书。
- kafkaReadTruststorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java TrustStore (JKS) 文件以进行 Kafka TLS 身份验证的密码。例如,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadKeystorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件以进行 Kafka TLS 身份验证的密码。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadKeyPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java KeyStore (JKS) 文件中的私钥以进行 Kafka TLS 身份验证的密码。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadSaslScramUsernameSecretId:Google Cloud Secret Manager Secret ID,其中包含要与
SASL_SCRAM
身份验证搭配使用的 Kafka 用户名。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadSaslScramPasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含要与
SASL_SCRAM
身份验证搭配使用的 Kafka 密码。例如projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - kafkaReadSaslScramTruststoreLocation:Java TrustStore (JKS) 文件的 Google Cloud Storage 路径,该文件包含用于验证 Kafka 代理身份的受信任证书。
- kafkaReadSaslScramTruststorePasswordSecretId:Google Cloud Secret Manager Secret ID,其中包含用于访问 Java TrustStore (JKS) 文件以进行 Kafka SASL_SCRAM 身份验证的密码。例如,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - schemaFormat:Kafka 架构格式。可以以
SINGLE_SCHEMA_FILE
或SCHEMA_REGISTRY
的形式提供。如果指定了SINGLE_SCHEMA_FILE
,则对所有消息使用 avro 架构文件中提及的架构。如果指定了SCHEMA_REGISTRY
,消息可以具有单个架构或多个架构。默认值为 SINGLE_SCHEMA_FILE。 - confluentAvroSchemaPath:用于解码主题中所有消息的单个 Avro 架构文件的 Google Cloud Storage 路径。默认值为空。
- schemaRegistryConnectionUrl:用于管理 Avro 架构以进行消息解码的 Confluent 架构注册表实例的网址。默认值为空。
- binaryAvroSchemaPath:用于解码二进制编码 Avro 消息的 Avro 架构文件的 Google Cloud Storage 路径。默认值为空。
- schemaRegistryAuthenticationMode:架构注册表身份验证模式。可以是 NONE、TLS 或 OAUTH。默认为:NONE。
- schemaRegistryTruststoreLocation:SSL 证书的位置,用于存储对 Schema Registry 进行身份验证的信任库。例如
/your-bucket/truststore.jks
。 - schemaRegistryTruststorePasswordSecretId:Secret Manager 中存储信任库中用于访问 Secret 的密码的 SecretId。例如
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
。 - schemaRegistryKeystoreLocation:包含 SSL 证书和私钥的密钥库位置。例如
/your-bucket/keystore.jks
。 - schemaRegistryKeystorePasswordSecretId:Secret Manager 中用于访问密钥库文件的密码的 SecretId。例如,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
。 - schemaRegistryKeyPasswordSecretId:访问密钥库中存储的客户端私钥所需的密码的 SecretId。例如,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
。 - schemaRegistryOauthClientId:用于在 OAUTH 模式下对架构注册表客户端进行身份验证的客户端 ID。对于 AVRO_CONFLUENT_WIRE_FORMAT 消息格式,此属性是必需的。
- schemaRegistryOauthClientSecretId:Google Cloud Secret Manager Secret ID,其中包含用于在 OAUTH 模式下对架构注册表客户端进行身份验证的客户端密钥。对于 AVRO_CONFLUENT_WIRE_FORMAT 消息格式,此属性是必需的。例如
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
。 - schemaRegistryOauthScope:用于在 OAUTH 模式下对架构注册表客户端进行身份验证的访问令牌范围。此字段是可选字段,因为即使不传递范围参数,也可以发出请求。例如
openid
。 - schemaRegistryOauthTokenEndpointUrl:用于在 OAUTH 模式下对架构注册表客户端进行身份验证的基于 HTTP(S) 的 OAuth/OIDC 身份提供商的网址。对于 AVRO_CONFLUENT_WIRE_FORMAT 消息格式,此属性是必需的。
- outputDeadletterTable:失败消息的完全限定 BigQuery 表名称。出于各种原因(例如,架构不匹配、JSON 格式错误)未能到达输出表的消息会写入该表。该表将由模板创建。例如
your-project-id:your-dataset.your-table-name
。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Kafka to Cloud Storage template。
- 在提供的参数字段中,输入您的参数值。
- 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \ --parameters \ readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputDirectory=gs://STORAGE_BUCKET_NAME,\ useBigQueryDLQ=false
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
BOOTSTRAP_SERVER_AND_TOPIC
:Apache Kafka 引导服务器地址和主题引导服务器地址和主题的格式取决于集群类型:
- Managed Service for Apache Kafka 集群:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- 外部 Kafka 集群:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka 集群:
STORAGE_BUCKET_NAME
:写入输出的 Cloud Storage 存储分区
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "messageFormat": "JSON", "outputDirectory": "gs://STORAGE_BUCKET_NAME", "useBigQueryDLQ": "false" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex", } }
替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
BOOTSTRAP_SERVER_AND_TOPIC
:Apache Kafka 引导服务器地址和主题引导服务器地址和主题的格式取决于集群类型:
- Managed Service for Apache Kafka 集群:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- 外部 Kafka 集群:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Managed Service for Apache Kafka 集群:
STORAGE_BUCKET_NAME
:写入输出的 Cloud Storage 存储分区
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。