處理 Bigtable 變更串流


本教學課程說明如何將資料管道部署至 Dataflow,以取得來自 Bigtable 資料表變更串流的資料庫變更即時串流。管道的輸出內容會寫入 Cloud Storage 上的一系列檔案。

我們提供音樂收聽應用程式的範例資料集。在本教學課程中,您會追蹤聆聽的歌曲,然後在一段時間內列出前五名。

本教學課程適用於熟悉程式碼編寫,以及將資料管道部署至 Google Cloud的技術人員。

目標

本教學課程將說明如何執行下列操作:

  • 建立啟用變更串流的 Bigtable 資料表。
  • 在 Dataflow 上部署管道,轉換並輸出變更串流。
  • 查看資料管道的結果。

費用

在本文件中,您會使用 Google Cloud的下列計費元件:

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用資格。

完成本文所述工作後,您可以刪除已建立的資源,避免繼續計費。詳情請參閱清除所用資源一節。

事前準備

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  8. Install the Google Cloud CLI.

  9. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, and Cloud Storage APIs:

    gcloud services enable dataflow.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com storage.googleapis.com
  14. 更新並安裝 cbt CLI 。
    gcloud components update
    gcloud components install cbt

準備環境

取得程式碼

複製包含範例程式碼的存放區。如果先前已下載這個存放區,請提取最新版本。

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

建立值區

  • Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.

    建立 Bigtable 執行個體

    在本教學課程中,您可以使用現有執行個體,或在您附近的區域建立執行個體,並採用預設設定。

    建立資料表

    這個範例應用程式會追蹤使用者收聽的歌曲,並將收聽事件儲存在 Bigtable 中。建立啟用變更串流的資料表,其中包含一個資料欄系列 (cf) 和一個資料欄 (song),並使用使用者 ID 做為資料列索引鍵。

    建立表格。

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    更改下列內容:

    • PROJECT_ID:您使用的專案 ID
    • BIGTABLE_INSTANCE_ID:要包含新資料表的執行個體 ID

    啟動管道

    這個管道會執行下列動作,轉換變更串流:

    1. 讀取變更串流
    2. 取得歌曲名稱
    3. 將歌曲聆聽事件分組到 N 秒的時間範圍內
    4. 計算前五名歌曲
    5. 輸出結果

    執行管道。

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    請將 BIGTABLE_REGION 改成 Bigtable 執行個體所在的區域 ID,例如 us-east5

    瞭解管道

    管道中的下列程式碼片段可協助您瞭解執行的程式碼。

    讀取變更串流

    這個範例中的程式碼會使用特定 Bigtable 執行個體和資料表的參數,設定來源串流。

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    取得歌曲名稱

    當使用者聆聽歌曲時,系統會將歌曲名稱寫入欄系列 cf 和欄限定符 song,因此程式碼會從變更串流突變中擷取值,並輸出至管道的下一個步驟。

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    計算前五名歌曲

    您可以使用內建的 Beam 函式 CountTop.of,取得目前視窗中排名前五的歌曲。

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    輸出結果

    這個管道會將結果寫入標準輸出內容和檔案。如果是檔案,系統會將寫入作業分組,每組 10 個元素或 1 分鐘的區段。

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    查看管道

    1. 前往 Google Cloud 控制台的「Dataflow」頁面。

      前往 Dataflow

    2. 按一下名稱開頭為 song-rank 的工作。

    3. 按一下畫面底部的「顯示」,開啟記錄面板。

    4. 按一下「Worker logs」(工作人員記錄),即可監控變更串流的輸出記錄。

    串流寫入

    使用 cbt CLI 將多位使用者收聽歌曲的次數寫入 song-rank 資料表。這項設計會在幾分鐘內完成寫入,模擬歌曲收聽串流隨時間累積的狀況。

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    查看輸出內容

    讀取 Cloud Storage 的輸出內容,查看最熱門的歌曲。

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    輸出內容範例:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]
    
  • 清除所用資源

    如要避免系統向您的 Google Cloud 帳戶收取本教學課程中所用資源的相關費用,請刪除含有該項資源的專案,或者保留專案但刪除個別資源。

    刪除專案

      Delete a Google Cloud project:

      gcloud projects delete PROJECT_ID

    刪除個別資源

    1. 刪除 bucket 和檔案。

      gcloud storage rm --recursive gs://BUCKET_NAME/
      
    2. 停用資料表的變更串流。

      gcloud bigtable instances tables update song-rank --instance=BIGTABLE_INSTANCE_ID \
      --clear-change-stream-retention-period
      
    3. 刪除資料表 song-rank

      cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID deletetable song-rank
      
    4. 停止變更串流管道。

      1. 列出工作以取得工作 ID。

        gcloud dataflow jobs list --region=BIGTABLE_REGION
        
      2. 取消工作。

        gcloud dataflow jobs cancel JOB_ID --region=BIGTABLE_REGION
        

        JOB_ID 換成上一個指令顯示的工作 ID。

    後續步驟