建立電子商務串流管道


在本教學課程中,您會建立 Dataflow 串流管道,轉換來自 Pub/Sub 主題和訂閱項目的電子商務資料,並將資料輸出至 BigQuery 和 Bigtable。本教學課程需要 Gradle

本教學課程提供端對端電子商務範例應用程式,可將網路商店的資料串流至 BigQuery 和 Bigtable。這個範例應用程式說明瞭實作串流資料分析和即時人工智慧 (AI) 的常見用途和最佳做法。請參閱本教學課程,瞭解如何動態回應顧客動作,即時分析及回應事件。本教學課程說明如何儲存、分析及以圖表呈現事件資料,進一步瞭解消費者行為。

您可以在 GitHub 上取得範例應用程式。如要使用 Terraform 執行本教學課程,請按照 GitHub 範例應用程式提供的步驟操作。

目標

  • 驗證傳入的資料,並盡可能套用修正內容。
  • 分析點擊流資料,計算指定時間範圍內每個產品的瀏覽次數。將這項資訊儲存在低延遲儲存空間中。 應用程式接著就能使用這些資料,在網站上向顧客顯示查看這項產品的人數訊息。
  • 使用交易資料輔助訂購商品:

    • 分析交易資料,計算特定期間內每項商品的銷售總數 (依商店和全球)。
    • 分析商品目錄資料,計算每項商品的進貨量。
    • 持續將這項資料傳送至商品目錄系統,以供商品目錄購買決策使用。
  • 驗證傳入的資料,並盡可能套用修正內容。 將任何無法修正的資料寫入無法寄送的訊息佇列,以進行額外分析和處理。建立指標,代表傳送至無法傳送郵件佇列的傳入資料百分比,以供監控和快訊使用。

  • 將所有傳入資料處理為標準格式,並儲存在資料倉儲中,以供日後分析和視覺化。

  • 將店內銷售交易資料去正規化,以便納入商店位置的緯度和經度等資訊。透過 BigQuery 中緩慢變動的資料表提供商店資訊,並使用商店 ID 做為鍵。

資料

應用程式會處理下列類型的資料:

  • 線上系統傳送至 Pub/Sub 的點擊流資料。
  • 內部部署或軟體即服務 (SaaS) 系統傳送至 Pub/Sub 的交易資料。
  • 內部部署或 SaaS 系統傳送至 Pub/Sub 的股票資料。

工作模式

應用程式包含下列工作模式,這些模式常見於使用 Java 適用的 Apache Beam SDK 建構的管道:

費用

在本文件中,您會使用 Google Cloud的下列計費元件:

  • BigQuery
  • Bigtable
  • Cloud Scheduler
  • Compute Engine
  • Dataflow
  • Pub/Sub

如要根據預測用量估算費用,請使用 Pricing Calculator

初次使用 Google Cloud 的使用者可能符合免費試用資格。

完成本文所述工作後,您可以刪除已建立的資源,避免繼續計費。詳情請參閱清除所用資源一節。

事前準備

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Compute Engine, Dataflow, Pub/Sub, BigQuery, Bigtable, Bigtable Admin, and Cloud Scheduler APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com pubsub.googleapis.com bigquery.googleapis.com bigtable.googleapis.com bigtableadmin.googleapis.com  cloudscheduler.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. 為新管道建立使用者代管的工作人員服務帳戶,並將必要角色授予該服務帳戶。

    1. 如要建立服務帳戶,請執行 gcloud iam service-accounts create 指令:

      gcloud iam service-accounts create retailpipeline \
          --description="Retail app data pipeline worker service account" \
          --display-name="Retail app data pipeline access"
    2. 將角色授予服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:

      • roles/dataflow.admin
      • roles/dataflow.worker
      • roles/pubsub.editor
      • roles/bigquery.dataEditor
      • roles/bigtable.admin
      • roles/bigquery.jobUser
      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

      SERVICE_ACCOUNT_ROLE 替換為各個角色。

    3. 將可為服務帳戶建立存取權杖的角色授予 Google 帳戶:

      gcloud iam service-accounts add-iam-policy-binding retailpipeline@PROJECT_ID.iam.gserviceaccount.com --member="user:EMAIL_ADDRESS" --role=roles/iam.serviceAccountTokenCreator
  19. 視需要下載及安裝 Gradle

建立範例來源和接收器

本節說明如何建立下列項目:

  • 做為暫時儲存位置的 Cloud Storage bucket
  • 使用 Pub/Sub 串流資料來源
  • 將資料載入 BigQuery 的資料集
  • Bigtable 執行個體

建立 Cloud Storage 值區

首先,請建立 Cloud Storage 值區。 這個 bucket 是 Dataflow 管道的暫時儲存位置。

使用 gcloud storage buckets create 指令

gcloud storage buckets create gs://BUCKET_NAME --location=LOCATION

更改下列內容:

  • BUCKET_NAME:Cloud Storage bucket 的名稱,必須符合bucket 命名規定。Cloud Storage bucket 名稱不得重複。
  • LOCATION:值區的位置

建立 Pub/Sub 主題和訂閱項目

建立四個 Pub/Sub 主題,然後建立三個訂閱項目。

如要建立主題,請為每個主題執行一次 gcloud pubsub topics create 指令。如要瞭解如何命名訂閱項目,請參閱「主題或訂閱項目命名規範」。

gcloud pubsub topics create TOPIC_NAME

TOPIC_NAME 替換為下列值,並針對每個主題執行一次指令,總共執行四次:

  • Clickstream-inbound
  • Transactions-inbound
  • Inventory-inbound
  • Inventory-outbound

如要為主題建立訂閱項目,請針對每個訂閱項目執行一次 gcloud pubsub subscriptions create 指令:

  1. 建立Clickstream-inbound-sub訂閱項目:

    gcloud pubsub subscriptions create --topic Clickstream-inbound Clickstream-inbound-sub
    
  2. 建立Transactions-inbound-sub訂閱項目:

    gcloud pubsub subscriptions create --topic Transactions-inbound Transactions-inbound-sub
    
  3. 建立Inventory-inbound-sub訂閱項目:

    gcloud pubsub subscriptions create --topic Inventory-inbound Inventory-inbound-sub
    

建立 BigQuery 資料集和資料表

建立 BigQuery 資料集和已分割資料表,並為 Pub/Sub 主題設定適當的結構定義。

  1. 使用 bq mk 指令建立第一個資料集。

    bq --location=US mk \
    PROJECT_ID:Retail_Store
    
  2. 建立第二個資料集。

    bq --location=US mk \
    PROJECT_ID:Retail_Store_Aggregations
    
  3. 使用 CREATE TABLE SQL 陳述式建立含有結構定義和測試資料的資料表。測試資料有一個 ID 值為 1 的商店。緩慢更新的側邊輸入模式會使用這個資料表。

    bq query --use_legacy_sql=false \
      'CREATE TABLE
        Retail_Store.Store_Locations
        (
          id INT64,
          city STRING,
          state STRING,
          zip INT64
        );
      INSERT INTO Retail_Store.Store_Locations
      VALUES (1, "a_city", "a_state",00000);'
    

建立 Bigtable 執行個體和資料表

建立 Bigtable 執行個體和資料表。如要進一步瞭解如何建立 Bigtable 執行個體,請參閱「建立執行個體」。

  1. 如有需要,請執行下列指令來安裝 cbt CLI

    gcloud components install cbt
    
  2. 使用 bigtable instances create 指令建立執行個體:

    gcloud bigtable instances create aggregate-tables \
        --display-name=aggregate-tables \
        --cluster-config=id=aggregate-tables-c1,zone=CLUSTER_ZONE,nodes=1
    

    CLUSTER_ZONE 替換為叢集執行的區域

  3. 使用 cbt createtable 指令建立資料表:

    cbt -instance=aggregate-tables createtable PageView5MinAggregates
    
  4. 使用下列指令將資料欄系列新增至資料表:

    cbt -instance=aggregate-tables createfamily PageView5MinAggregates pageViewAgg
    

執行管道

使用 Gradle 執行串流管道。 如要查看管線使用的 Java 程式碼,請參閱 RetailDataProcessingPipeline.java

  1. 使用 git clone 指令複製 GitHub 存放區:

    git clone https://github.com/GoogleCloudPlatform/dataflow-sample-applications.git
    
  2. 切換至應用程式目錄:

    cd dataflow-sample-applications/retail/retail-java-applications
    
  3. 如要測試管道,請在殼層或終端機中,使用 Gradle 執行下列指令:

    ./gradlew :data-engineering-dept:pipelines:test --tests RetailDataProcessingPipelineSimpleSmokeTest --info --rerun-tasks
    
  4. 如要執行管道,請使用 Gradle 執行下列指令:

    ./gradlew tasks executeOnDataflow -Dexec.args=" \
    --project=PROJECT_ID \
    --tempLocation=gs://BUCKET_NAME/temp/ \
    --runner=DataflowRunner \
    --region=REGION \
    --clickStreamPubSubSubscription=projects/PROJECT_ID/subscriptions/Clickstream-inbound-sub \
    --transactionsPubSubSubscription=projects/PROJECT_ID/subscriptions/Transactions-inbound-sub \
    --inventoryPubSubSubscriptions=projects/PROJECT_ID/subscriptions/Inventory-inbound-sub \
    --aggregateStockPubSubOutputTopic=projects/PROJECT_ID/topics/Inventory-outbound \
    --dataWarehouseOutputProject=PROJECT_ID \
    --serviceAccount=retailpipeline.PROJECT_ID.iam.gserviceaccount.com"
    

請參閱 GitHub 上的管道原始碼

建立及執行 Cloud Scheduler 工作

建立並執行三項 Cloud Scheduler 工作,分別用於發布點擊流資料、目錄資料和交易資料。這個步驟會為管道產生範例資料。

  1. 如要為本教學課程建立 Cloud Scheduler 工作,請使用 gcloud scheduler jobs create 指令。這個步驟會建立點擊串流資料的發布者,每分鐘發布一則訊息。

    gcloud scheduler jobs create pubsub clickstream \
      --schedule="* * * * *" \
      --location=LOCATION \
      --topic="Clickstream-inbound" \
      --message-body='{"uid":464670,"sessionId":null,"returning":false,"lat":39.669082,"lng":-80.312306,"agent":"Mozilla/5.0 (iPad; CPU OS 12_2 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148;","event":"add-to-cart","transaction":false,"timestamp":1660091197071,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"user_id":74378,"client_id":"52393559","page_previous":"P_3","page":"P_3","event_datetime":"2022-08-10 12:26:37"}'
    
  2. 如要啟動 Cloud Scheduler 工作,請使用 gcloud scheduler jobs run 指令。

    gcloud scheduler jobs run --location=LOCATION clickstream
    
  3. 建立並執行另一個類似的發布商,發布商品目錄資料,每兩分鐘發布一則訊息。

    gcloud scheduler jobs create pubsub inventory \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Inventory-inbound" \
      --message-body='{"count":1,"sku":0,"aisleId":0,"product_name":null,"departmentId":0,"price":null,"recipeId":null,"image":null,"timestamp":1660149636076,"store_id":1,"product_id":10050}'
    
  4. 啟動第二個 Cloud Scheduler 工作。

    gcloud scheduler jobs run --location=LOCATION inventory
    
  5. 建立並執行第三個發布者,用於發布交易資料,每兩分鐘發布一則訊息。

    gcloud scheduler jobs create pubsub transactions \
      --schedule="*/2 * * * *" \
      --location=LOCATION  \
      --topic="Transactions-inbound" \
      --message-body='{"order_number":"b8be9222-990d-11ea-9c05-42010af00081","user_id":998685,"store_id":1,"returning":false,"time_of_sale":0,"department_id":0,"product_id":4,"product_count":1,"price":25.0,"order_id":0,"order_dow":0,"order_hour_of_day":0,"order_woy":0,"days_since_prior_order":null,"product_name":null,"product_sku":0,"image":null,"timestamp":1660157951000,"ecommerce":{"items":[{"item_name":"Donut Friday Scented T-Shirt","item_id":"67890","price":33.75,"item_brand":"Google","item_category":"Apparel","item_category_2":"Mens","item_category_3":"Shirts","item_category_4":"Tshirts","item_variant":"Black","item_list_name":"Search Results","item_list_id":"SR123","index":1,"quantity":2}]},"client_id":"1686224283","page_previous":null,"page":null,"event_datetime":"2022-08-10 06:59:11"}'
    
  6. 啟動第三個 Cloud Scheduler 工作。

    gcloud scheduler jobs run --location=LOCATION transactions
    

查看結果

查看寫入 BigQuery 資料表的資料。執行下列查詢,在 BigQuery 中查看結果。這個管道執行時,您會看到每分鐘都有新資料列附加到 BigQuery 資料表。

您可能需要等待資料填入表格。

bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_inventory_data"'`'
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.Retail_Store.clean_transaction_data"'`'

清除所用資源

如要避免系統向您的 Google Cloud 帳戶收取本教學課程中所用資源的相關費用,請刪除含有該項資源的專案,或者保留專案但刪除個別資源。

刪除專案

如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的專案。 Google Cloud

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

刪除個別資源

如要重複使用專案,請刪除您為本教學課程建立的資源。

清除 Google Cloud 專案資源

  1. 如要刪除 Cloud Scheduler 工作,請使用 gcloud scheduler jobs delete 指令。

     gcloud scheduler jobs delete transactions --location=LOCATION
    
     gcloud scheduler jobs delete inventory --location=LOCATION
    
     gcloud scheduler jobs delete clickstream --location=LOCATION
    
  2. 如要刪除 Pub/Sub 訂閱項目和主題,請使用 gcloud pubsub subscriptions deletegcloud pubsub topics delete 指令。

    gcloud pubsub subscriptions delete SUBSCRIPTION_NAME
    gcloud pubsub topics delete TOPIC_NAME
    
  3. 如要刪除 BigQuery 資料表,請使用 bq rm 指令。

    bq rm -f -t PROJECT_ID:Retail_Store.Store_Locations
    
  4. 刪除 BigQuery 資料集。單獨使用資料集不會產生任何費用。

    bq rm -r -f -d PROJECT_ID:Retail_Store
    
    bq rm -r -f -d PROJECT_ID:Retail_Store_Aggregations
    
  5. 如要刪除 Bigtable 執行個體,請使用 cbt deleteinstance 指令。單獨使用值區不會產生任何費用。

    cbt deleteinstance aggregate-tables
    
  6. 如要刪除 Cloud Storage bucket 和其中的物件,請使用 gcloud storage rm 指令。單獨使用值區不會產生任何費用。

    gcloud storage rm gs://BUCKET_NAME --recursive
    

撤銷憑證

  1. 撤銷您授予使用者代管工作者服務帳戶的角色。 針對下列每個 IAM 角色,執行一次下列指令:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    • roles/bigtable.admin
    • roles/bigquery.jobUser
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:retailpipeline@PROJECT_ID.iam.gserviceaccount.com \
        --role=ROLE
  2. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  3. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

後續步驟