处理 Bigtable 变更数据流


本教程介绍了如何将数据流水线部署到 Dataflow,以实时流式传输源自 Bigtable 表的变更数据流的数据库更改。流水线的输出会写入 Cloud Storage 上的一系列文件中。

本教程提供了一个聆听音乐的应用的示例数据集。在本教程中,您将跟踪聆听过的歌曲,然后排出一段时间内最常听的前五首歌曲。

本教程适用于熟悉代码编写以及将数据流水线部署到 Google Cloud 的技术用户。

目标

本教程介绍了如何执行以下操作:

  • 创建启用了变更数据流的 Bigtable 表。
  • 在 Dataflow 上部署流水线,以转换和输出变更数据流。
  • 查看数据流水线的结果。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  7. Install the Google Cloud CLI.
  8. To initialize the gcloud CLI, run the following command:

    gcloud init
  9. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  10. Make sure that billing is enabled for your Google Cloud project.

  11. Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  12. 更新并安装 cbt CLI。
    gcloud components update
    gcloud components install cbt

准备环境

获取代码

克隆包含示例代码的代码库。如果您之前已下载此代码库,请拉取以获取最新版本。

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

创建存储桶

  • Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.

    创建 Bigtable 实例

    您可以在本教程中使用现有实例,也可以在您附近的区域使用默认配置创建实例

    创建表

    示例应用跟踪用户聆听的歌曲并将聆听事件存储在 Bigtable 中。创建一个启用了变更数据流的表,该表具有一个列族 (cf) 和一个列(歌曲),并使用用户 ID 作为行键。

    创建表。

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    请替换以下内容:

    • PROJECT_ID:您使用的项目的 ID
    • BIGTABLE_INSTANCE_ID:要包含新表的实例的 ID

    启动流水线

    此流水线通过执行以下操作来转换变更数据流:

    1. 读取变更数据流
    2. 获取歌曲名称
    3. 将歌曲聆听事件分组到 N 秒窗口中
    4. 统计前五首歌曲
    5. 输出结果

    运行流水线。

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    BIGTABLE_REGION 替换为 Bigtable 实例所在区域的 ID,例如 us-east5

    了解流水线

    流水线中的以下代码段可帮助您了解正在运行的代码。

    读取变更数据流

    此示例中的代码使用特定 Bigtable 实例和表的参数配置源数据流。

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    获取歌曲名称

    聆听歌曲时,歌曲名称会写入列族 cf 和列限定符 song,因此代码会从变更数据流变更中提取值并将其输出到流水线的下一步。

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    统计前五首歌曲

    您可以使用内置的 Beam 函数 CountTop.of 来获取当前时间段内最常听的前五首歌曲。

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    输出结果

    此流水线将结果写入标准输出和文件中。对于这些文件,它会将写入内容划分为包含 10 个元素或 1 分钟片段的组。

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    查看流水线

    1. 在 Google Cloud 控制台中,转到 Dataflow 页面。

      进入 Dataflow

    2. 点击名称以 song-rank 开头的作业。

    3. 点击屏幕底部的显示,以打开日志面板。

    4. 点击工作器日志以监控变更数据流的输出日志。

    流式写入

    使用 cbt CLI 将各种用户聆听的歌曲次数写入 song-rank 表。其设计目的是在几分钟内写入数据,以模拟一段时间内的歌曲聆听流式传输情况。

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    查看输出

    读取 Cloud Storage 上的输出,以查看最热门的歌曲。

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    输出示例:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    
  • 清理

    为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

    删除项目

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    删除各个资源

    1. 删除存储桶和文件。

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. 在表上停用变更数据流

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. 删除 song-rank 表。

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. 停止变更数据流流水线。

      1. 列出作业以获取作业 ID。

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. 取消作业。

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        JOB_ID 替换为上一条命令之后显示的作业 ID。

    后续步骤