本页面介绍了如何使用 Dataflow 从 Google Cloud Managed Service for Apache Kafka 读取数据,并将记录写入 BigQuery 表。本教程使用 Apache Kafka to BigQuery 模板创建 Dataflow 作业。
概览
Apache Kafka 是一个用于流式传输事件的开源平台。Kafka 通常用于分布式架构,可在松散耦合的各组件之间实现通信。您可以使用 Dataflow 从 Kafka 读取事件、处理事件,并将结果写入 BigQuery 表以供进一步分析。
Managed Service for Apache Kafka 是一项 Google Cloud 服务,可帮助您运行安全且可扩缩的 Kafka 集群。

所需权限
Dataflow 工作器服务账号必须具有以下 Identity and Access Management (IAM) 角色:
- Managed Kafka Client (
roles/managedkafka.client
) - BigQuery Data Editor (
roles/bigquery.dataEditor
)
如需了解详情,请参阅 Dataflow 安全性和权限。
创建 Kafka 集群
在此步骤中,您将创建一个 Managed Service for Apache Kafka 集群。如需了解详情,请参阅创建 Managed Service for Apache Kafka 集群。
控制台
前往 Managed Service for Apache Kafka > 集群页面。
点击创建。
在集群名称字段中,输入集群的名称。
在区域列表中,为集群选择一个位置。
点击创建。
gcloud
使用 managed-kafka clusters create
命令。
gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
替换以下内容:
CLUSTER
:集群的名称REGION
:您在其中创建子网的区域PROJECT_ID
:您的项目 IDSUBNET_NAME
:您要部署集群的子网
创建集群通常需要 20-30 分钟。
创建 Kafka 主题
创建 Managed Service for Apache Kafka 集群后,创建主题。
控制台
前往 Managed Service for Apache Kafka > 集群页面。
点击集群的名称。
在集群详细信息页面中,点击创建主题。
在主题名称框中,输入主题的名称。
点击创建。
gcloud
使用 managed-kafka topics create
命令。
gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
替换以下内容:
TOPIC_NAME
:要创建的主题的名称
创建 BigQuery 表
在此步骤中,您将创建一个具有以下架构的 BigQuery 表:
列名 | 数据类型 |
---|---|
name |
STRING |
customer_id |
INTEGER |
如果您尚未创建 BigQuery 数据集,请先创建。 如需了解详情,请参阅创建数据集。然后创建一个新的空表:
控制台
转到 BigQuery 页面。
在探索器窗格中,展开您的项目,然后选择数据集。
在数据集信息部分中,点击
创建表。在基于以下数据源创建表列表中,选择空表。
在表框中,输入表的名称。
在架构部分中,点击以文本形式修改。
粘贴以下架构定义:
name:STRING, customer_id:INTEGER
点击创建表。
gcloud
使用 bq mk
命令。
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
替换以下内容:
PROJECT_ID
:您的项目 IDDATASET_NAME
:数据集的名称TABLE_NAME
:要创建的表的名称。
运行 Dataflow 作业
创建 Kafka 集群和 BigQuery 表后,运行 Dataflow 模板。
控制台
首先,获取集群的引导服务器地址:
在 Google Cloud 控制台中,前往集群页面。
点击集群名称。
点击配置标签页。
从引导网址复制引导服务器地址。
接下来,运行模板以创建 Dataflow 作业:
前往 Dataflow > 作业页面。
点击基于模板创建作业。
在作业名称字段中,输入
kafka-to-bq
。在区域端点部分中,选择您的 Managed Service for Apache Kafka 集群所在的区域。
选择“Kafka to BigQuery”模板。
输入以下模板参数:
- Kafka 引导服务器:引导服务器地址
- 来源 Kafka 主题:要读取的主题的名称
- Kafka 来源身份验证模式:
APPLICATION_DEFAULT_CREDENTIALS
- Kafka 消息格式:
JSON
- 表名称策略:
SINGLE_TABLE_NAME
- BigQuery 输出表:BigQuery 表,格式如下:
PROJECT_ID
:DATASET_NAME
。TABLE_NAME
在死信队列下,选中将错误写入 BigQuery。
输入死信队列的 BigQuery 表名称,格式如下:
PROJECT_ID
:DATASET_NAME
。ERROR_TABLE_NAME
请勿提前创建此表。流水线会创建它。
点击运行作业。
gcloud
使用 dataflow flex-template run
命令。
gcloud dataflow flex-template run kafka-to-bq \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters \ readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\ persistKafkaKey=false,\ writeMode=SINGLE_TABLE_NAME,\ kafkaReadOffset=earliest,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
执行以下变量替换操作:
LOCATION
:您的 Managed Service for Apache Kafka 所在的区域PROJECT_ID
:您的 Google Cloud 项目的名称CLUSTER_ID
:集群的名称TOPIC
:Kafka 主题的名称DATASET_NAME
:数据集的名称TABLE_NAME
:表的名称ERROR_TABLE_NAME
:死信队列的 BigQuery 表名称
请勿提前为死信队列创建表。流水线会创建它。
将消息发送到 Kafka
Dataflow 作业启动后,您可以将消息发送到 Kafka,然后流水线会将消息写入 BigQuery。
在与 Kafka 集群相同的子网中创建虚拟机,并安装 Kafka 命令行工具。如需了解详细说明,请参阅使用 CLI 发布和使用消息中的设置客户端机器。
运行以下命令,将消息写入 Kafka 主题:
kafka-console-producer.sh \ --topic TOPIC \ --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \ --producer.config client.properties
执行以下变量替换操作:
TOPIC
:Kafka 主题的名称CLUSTER_ID
:集群的名称LOCATION
:集群所在的区域。PROJECT_ID
:您的 Google Cloud 项目的名称
在提示符处,输入以下几行文字以将消息发送到 Kafka:
{"name": "Alice", "customer_id": 1} {"name": "Bob", "customer_id": 2} {"name": "Charles", "customer_id": 3}
使用死信队列
在作业运行期间,流水线可能无法将个别消息写入 BigQuery。可能的错误包括:
- 序列化错误(包括格式错误的 JSON)。
- 类型转换错误(由于表架构与 JSON 数据不匹配导致)。
- JSON 数据中包含的不存在于表架构中的额外字段。
这些错误不会导致作业失败,也不会在 Dataflow 作业日志中显示为错误。相反,流水线使用死信队列来处理这些类型的错误。
如需在运行模板时启用死信队列,请设置以下模板参数:
useBigQueryDLQ
:true
outputDeadletterTable
:完全限定的 BigQuery 表名称,例如my-project:dataset1.errors
流水线会自动创建该表。如果在处理 Kafka 消息时发生错误,流水线会将错误条目写入表中。
错误消息示例:
错误类型 | 事件数据 | errorMessage |
---|---|---|
序列化错误 | "Hello world" | Failed to serialize json to table row: "Hello world" |
类型转换错误 | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
未知字段 | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
使用 BigQuery 数据类型
在内部,Kafka I/O 连接器会将 JSON 消息载荷转换为 Apache Beam TableRow
对象,并将 TableRow
字段值转换为 BigQuery 类型。
下表显示了 BigQuery 数据类型的 JSON 表示形式。
BigQuery 类型 | JSON 表示法 |
---|---|
ARRAY |
[1.2,3] |
BOOL |
true |
DATE |
"2022-07-01" |
DATETIME |
"2022-07-01 12:00:00.00" |
DECIMAL |
5.2E11 |
FLOAT64 |
3.142 |
GEOGRAPHY |
"POINT(1 2)" 使用已知文本 (WKT) 或 GeoJSON 格式以字符串形式指定地理位置。如需了解详情,请参阅加载地理空间数据。 |
INT64 |
10 |
INTERVAL |
"0-13 370 48:61:61" |
STRING |
"string_val" |
TIMESTAMP |
"2022-07-01T12:00:00.00Z" 使用 JavaScript |
结构化数据
如果您的 JSON 消息遵循一致的架构,您可以使用 BigQuery 中的 STRUCT
数据类型来表示 JSON 对象。
在以下示例中,answers
字段是一个 JSON 对象,其中包含两个子字段即 a
和 b
:
{"name":"Emily","answers":{"a":"yes","b":"no"}}
以下 SQL 语句会创建具有兼容架构的 BigQuery 表:
CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);
生成的表如下所示:
+-------+----------------------+
| name | answers |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
半结构化数据
如果您的 JSON 消息未遵循严格架构,请考虑将其作为 JSON
数据类型存储在 BigQuery 中。
如果将 JSON 数据存储为 JSON
类型,则无需事先定义架构。注入数据后,您可以使用 GoogleSQL 中的字段访问运算符(通过点表示法)和数组访问运算符来查询数据。如需了解详情,请参阅处理 GoogleSQL 中的 JSON 数据。
使用 UDF 转换数据
本教程假定 Kafka 消息采用 JSON 格式,并且 BigQuery 表架构与 JSON 数据匹配,并且未对数据应用任何转换。
您也可以视需要提供一个 JavaScript 用户定义的函数 (UDF),在将数据写入 BigQuery 之前先用该函数转换数据。UDF 还可以执行额外的处理,例如过滤、去除个人身份信息 (PII),或使用更多字段丰富数据。
如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数。