串流資料產生器到 Pub/Sub、BigQuery 和 Cloud Storage 範本

串流資料產生器範本可依據使用者提供的結構定義,以指定頻率產生數量無限或固定數量的綜合記錄或訊息。相容的目的地包括 Pub/Sub 主題、BigQuery 資料表和 Cloud Storage 值區。

以下列舉幾個可能的用途:

  • 模擬大規模即時事件發布至 Pub/Sub 主題,以評估及判斷處理發布事件所需的消費者數量和大小。
  • 將合成資料產生至 BigQuery 資料表或 Cloud Storage bucket,以評估效能基準或做為概念驗證。

支援的接收器和編碼格式

下表說明這個範本支援的接收器和編碼格式:
JSON Avro Parquet
Pub/Sub
BigQuery
Cloud Storage

管道相關規定

  • 工作站服務帳戶需要指派 Dataflow 工作站 (roles/dataflow.worker) 角色。詳情請參閱「身分與存取權管理簡介」。
  • 建立結構定義檔案,其中包含所產生資料的 JSON 範本。這個範本使用 JSON 資料產生器程式庫,因此您可以為結構定義中的每個欄位提供各種假造函式。詳情請參閱 json-data-generator 說明文件

    例如:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • 將結構定義檔案上傳至 Cloud Storage bucket。
  • 執行前必須先有輸出目標。視接收器類型而定,目標必須是 Pub/Sub 主題、BigQuery 資料表或 Cloud Storage 值區。
  • 如果輸出編碼是 Avro 或 Parquet,請建立 Avro 結構定義檔案,並儲存在 Cloud Storage 位置。
  • 視所需目的地而定,為工作人員服務帳戶指派額外的 IAM 角色。
    目的地 其他必要 IAM 角色 套用至哪個資源
    Pub/Sub Pub/Sub 發布者 (roles/pubsub.publisher)
    (詳情請參閱「使用 IAM 控管 Pub/Sub 存取權」)
    Pub/Sub 主題
    BigQuery BigQuery 資料編輯者 (roles/bigquery.dataEditor)
    (詳情請參閱「使用 IAM 控管 BigQuery 存取權」)
    BigQuery 資料集
    Cloud Storage Cloud Storage 物件管理員 (roles/storage.objectAdmin)
    (詳情請參閱「使用 IAM 控制 Cloud Storage 存取權」)
    Cloud Storage 值區

範本參數

參數 說明
schemaLocation 結構定義檔案的位置。例如 gs://mybucket/filename.json
qps 每秒要發布的訊息數。例如 100
sinkType (選用) 輸出接收器類型。可能的值為 PUBSUBBIGQUERYGCS。預設值為 PUBSUB。
outputType (選用) 輸出編碼類型。可能的值為 JSONAVROPARQUET。預設值為 JSON。
avroSchemaLocation (選用) AVRO 結構定義檔案的位置。如果 outputType 是 AVRO 或 PARQUET,則為必填。例如:gs://mybucket/filename.avsc
topic (選用) 管道應發布資料的 Pub/Sub 主題名稱。如果 sinkType 是 Pub/Sub,則為必填欄位。例如:projects/my-project-id/topics/my-topic-id
outputTableSpec (選用) 輸出 BigQuery 資料表的名稱。如果 sinkType 是 BigQuery,則為必填欄位。例如:my-project-ID:my_dataset_name.my-table-name
writeDisposition (選用) BigQuery 寫入配置。可能的值為 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。預設值為 WRITE_APPEND。
outputDeadletterTable (選用) 儲存失敗記錄的輸出 BigQuery 資料表名稱。如果未提供,管道會在執行期間建立資料表,名稱為 {output_table_name}_error_records。例如 my-project-ID:my_dataset_name.my-table-name
outputDirectory (選用) 輸出 Cloud Storage 位置的路徑。如果 sinkType 是 Cloud Storage,則為必填欄位。例如:gs://mybucket/pathprefix/
outputFilenamePrefix (選用) 寫入 Cloud Storage 的輸出檔案名稱前置字串。預設值為 output-。
windowDuration (選用) 輸出內容寫入 Cloud Storage 的時間間隔。預設值為 1m (也就是 1 分鐘)。
numShards (選用) 輸出資料分割數量上限。如果 sinkType 是 Cloud Storage,則為必填欄位,且應設為 1 以上的數字。
messagesLimit (選用) 輸出訊息數量上限。預設值為 0,表示沒有限制。
autoscalingAlgorithm (選用) 用於自動調度工作站資源的演算法。可能的值包括 THROUGHPUT_BASED (啟用自動調度資源模式) 或 NONE (停用該模式)。
maxNumWorkers (選用) 工作站機器的數量上限。例如 10

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Streaming Data Generator template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • SCHEMA_LOCATION:Cloud Storage 中結構定義檔案的路徑。例如:gs://mybucket/filename.json
  • QPS:每秒發布的訊息數
  • PUBSUB_TOPIC:輸出 Pub/Sub 主題。例如:projects/my-project-id/topics/my-topic-id

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • SCHEMA_LOCATION:Cloud Storage 中結構定義檔案的路徑。例如:gs://mybucket/filename.json
  • QPS:每秒發布的訊息數
  • PUBSUB_TOPIC:輸出 Pub/Sub 主題。例如:projects/my-project-id/topics/my-topic-id

後續步驟