Pub/Sub 訂閱項目到 BigQuery 範本

「Pub/Sub 訂閱項目到 BigQuery」範本為串流管道,可從 Pub/Sub 訂閱項目讀取 JSON 格式訊息,並將其寫入 BigQuery 資料表。如要將 Pub/Sub 資料移到 BigQuery,這個範本可以做為快速的因應方式。這個範本能從 Pub/Sub 讀取 JSON 格式的訊息,然後再將其轉換為 BigQuery 元素。

針對這個情境執行 Dataflow 管道之前,請考慮Pub/Sub BigQuery 訂閱項目搭配 UDF 是否符合您的需求。

管道相關規定

  • Pub/Sub 訊息的 data 欄位必須使用 JSON 格式,詳情請參閱這份 JSON 指南。 舉例來說,格式為 {"k1":"v1", "k2":"v2"}data 欄位值可以插入 BigQuery 資料表,並以兩欄 (分別命名為 k1k2 並具有字串資料類型) 形式呈現。
  • 在執行管道前,輸出資料表必須已存在。表格結構定義必須與輸入的 JSON 物件相符。

範本參數

必要參數

  • outputTableSpec:BigQuery 輸出資料表位置,格式為 <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
  • inputSubscription:要讀取的 Pub/Sub 輸入訂閱,格式為 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION>

選用參數

  • outputDeadletterTable:用於儲存無法送達輸出資料表的訊息的 BigQuery 資料表,格式為 <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>。如果該資料表不存在,將會於管道執行期間建立。如未指定,則會使用 OUTPUT_TABLE_SPEC_error_records
  • javascriptTextTransformGcsPath:定義要使用的 JavaScript 使用者定義函式 (UDF) 的 .js 檔案 Cloud Storage URI。例如:gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 使用者定義函式 (UDF) 名稱。舉例來說,如果您的 JavaScript 函式程式碼是 myTransform(inJson) { /*...do stuff...*/ },則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱 UDF 範例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:定義工作人員檢查 JavaScript UDF 變更以重新載入檔案的間隔。預設值為 0。

使用者定義函式

您可以視需要撰寫使用者定義函式 (UDF) 來擴充這個範本。範本會針對每個輸入元素呼叫 UDF。元素酬載會序列化為 JSON 字串。詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

函式規格

UDF 的規格如下:

  • 輸入:Pub/Sub 訊息資料欄位,序列化為 JSON 字串。
  • 輸出:符合 BigQuery 目的地資料表結構定義的 JSON 字串。
  • 執行範本

    控制台

    1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
    2. 前往「依據範本建立工作」
    3. 在「工作名稱」欄位中,輸入專屬工作名稱。
    4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

      如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

    5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Pub/Sub Subscription to BigQuery template。
    6. 在提供的參數欄位中輸入參數值。
    7. 選用:如要從「僅需處理一次」切換至「至少一次」串流模式,請選取「至少一次」
    8. 按一下「Run Job」(執行工作)

    gcloud

    在殼層或終端機中執行範本:

    gcloud dataflow jobs run JOB_NAME \
        --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/PubSub_Subscription_to_BigQuery \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME,\
    outputDeadletterTable=PROJECT_ID:DATASET.TABLE_NAME

    更改下列內容:

    • JOB_NAME: 您選擇的不重複工作名稱
    • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • STAGING_LOCATION:用於暫存本機檔案的位置 (例如 gs://your-bucket/staging)
    • SUBSCRIPTION_NAME:您的 Pub/Sub 訂閱項目名稱
    • DATASET:您的 BigQuery 資料集
    • TABLE_NAME:BigQuery 資料表名稱

    API

    如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/PubSub_Subscription_to_BigQuery
    {
       "jobName": "JOB_NAME",
       "parameters": {
           "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
       },
       "environment": {
           "ipConfiguration": "WORKER_IP_UNSPECIFIED",
           "additionalExperiments": []
       },
    }

    更改下列內容:

    • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
    • JOB_NAME: 您選擇的不重複工作名稱
    • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • STAGING_LOCATION:用於暫存本機檔案的位置 (例如 gs://your-bucket/staging)
    • SUBSCRIPTION_NAME:您的 Pub/Sub 訂閱項目名稱
    • DATASET:您的 BigQuery 資料集
    • TABLE_NAME:BigQuery 資料表名稱

    後續步驟