本教程介绍了如何将数据流水线部署到 Dataflow,以实时流式传输源自 Bigtable 表的变更数据流的数据库更改。流水线的输出会写入 Cloud Storage 上的一系列文件中。
本教程提供了一个聆听音乐的应用的示例数据集。在本教程中,您将跟踪聆听过的歌曲,然后排出一段时间内最常听的前五首歌曲。
本教程适用于熟悉代码编写以及将数据流水线部署到 Google Cloud的技术用户。
目标
本教程介绍了如何执行以下操作:
- 创建启用了变更数据流的 Bigtable 表。
- 在 Dataflow 上部署流水线,以转换和输出变更数据流。
- 查看数据流水线的结果。
费用
在本文档中,您将使用 Google Cloud的以下收费组件:
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name. -
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name. - 更新并安装
cbt
CLI。gcloud components update gcloud components install cbt
-
Create a Cloud Storage bucket and configure it as follows:
-
将
STORAGE_CLASS
替换为您偏好的存储类别。 -
将
LOCATION
替换为您偏好的位置(ASIA
、EU
或US
) -
将
BUCKET_NAME
替换为 符合存储桶名称要求的存储桶名称。 - PROJECT_ID:您正在使用的项目的 ID
- BIGTABLE_INSTANCE_ID:将包含新表的实例的 ID
- 读取变更数据流
- 获取歌曲名称
- 将歌曲聆听事件分组到 N 秒窗口中
- 统计前五首歌曲
- 输出结果
在 Google Cloud 控制台中,前往 Dataflow 页面。
点击名称以 song-rank 开头的作业。
点击屏幕底部的显示以打开日志面板。
点击工作器日志以监控变更数据流的输出日志。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STORAGE_CLASS --location LOCATION
创建 Bigtable 实例
您可以在本教程中使用现有实例,也可以在您附近的区域使用默认配置创建实例。
创建表
示例应用跟踪用户聆听的歌曲并将聆听事件存储在 Bigtable 中。创建一个启用了变更数据流的表,该表具有一个列族 (cf) 和一个列(歌曲),并使用用户 ID 作为行键。
创建表。
gcloud bigtable instances tables create song-rank \ --column-families=cf --change-stream-retention-period=7d \ --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
请替换以下内容:
启动流水线
此流水线通过执行以下操作来转换变更数据流:
运行流水线。
mvn compile exec:java -Dexec.mainClass=SongRank \ "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \ --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \ --outputLocation=gs://BUCKET_NAME/ \ --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
将 BIGTABLE_REGION 替换为您的 Bigtable 实例所在的区域的 ID,例如
us-east5
。了解流水线
以下流水线代码段可帮助您了解正在运行的代码。
读取变更数据流
此示例中的代码使用特定 Bigtable 实例和表的参数配置源数据流。
获取歌曲名称
聆听歌曲时,歌曲名称会写入列族
cf
和列限定符song
,因此代码会从变更数据流变更中提取值并将其输出到流水线的下一步。统计前五首歌曲
您可以使用内置的 Beam 函数
Count
和Top.of
来获取当前时间段内最常听的前五首歌曲。输出结果
此流水线将结果写入标准输出和文件中。对于这些文件,它会将写入内容划分为包含 10 个元素或 1 分钟片段的组。
查看流水线
流式写入
使用
cbt
CLI 将各种用户聆听的歌曲次数写入song-rank
表。其设计目的是在几分钟内写入数据,以模拟一段时间内的歌曲聆听流式传输情况。cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \ song-rank song-rank-data.csv column-family=cf batch-size=1
查看输出
读取 Cloud Storage 上的输出,以查看最热门的歌曲。
gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
输出示例:
2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}] 2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}] 2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
-
将
删除存储桶和文件。
gcloud storage rm --recursive gs://BUCKET_NAME/
在表上停用变更数据流
gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \ --clear-change-stream-retention-period
删除
song-rank
表。cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
停止变更数据流流水线。
列出作业以获取作业 ID。
gcloud dataflow jobs list --region=BIGTABLE_REGION
取消作业。
gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
将 JOB_ID 替换为上一条命令之后显示的作业 ID。
安装 Google Cloud CLI。 安装完成后,运行以下命令来初始化 Google Cloud CLI:
gcloud init
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
Create or select a Google Cloud project.
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
安装 Google Cloud CLI。 安装完成后,运行以下命令来初始化 Google Cloud CLI:
gcloud init
如果您使用的是外部身份提供方 (IdP),则必须先使用联合身份登录 gcloud CLI。
Create or select a Google Cloud project.
Verify that billing is enabled for your Google Cloud project.
Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:
gcloud services enable dataflow.googleapis.combigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
准备环境
获取代码
克隆包含示例代码的代码库。如果您之前已下载此代码库,请拉取以获取最新版本。
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams
创建存储桶
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID