Pub/Sub Proto 到 BigQuery 範本

「將 Pub/Sub 原型設計資料傳送至 BigQuery」範本是一種串流管道,可將 Pub/Sub 訂閱項目中的原型設計資料擷取至 BigQuery 資料表。寫入 BigQuery 資料表時發生的所有錯誤都會串流至未處理的 Pub/Sub 主題。

您可以提供 JavaScript 使用者定義函式 (UDF) 來轉換資料,執行 UDF 時發生的錯誤可以傳送至另一個 Pub/Sub 主題,也可以傳送至與 BigQuery 錯誤相同的未處理主題。

針對這個情境執行 Dataflow 管道之前,請考慮Pub/Sub BigQuery 訂閱項目搭配 UDF 是否符合您的需求。

管道相關規定

  • 輸入 Pub/Sub 訂閱項目必須已存在。
  • Proto 記錄的結構定義檔案必須存在於 Cloud Storage。
  • 輸出 Pub/Sub 主題必須已存在。
  • 輸出 BigQuery 資料集必須已存在。
  • 如果 BigQuery 資料表已存在,則無論 createDisposition 值為何,都必須具備與 Proto 資料相符的結構定義。

範本參數

參數 說明
protoSchemaPath 獨立原型設計結構定義檔案的 Cloud Storage 位置。例如:gs://path/to/my/file.pb。您可以使用 protoc 指令的 --descriptor_set_out 旗標產生這個檔案。 --include_imports 標記可確保檔案內容獨立完整。
fullMessageName 完整的 Proto 訊息名稱。例如 package.name.MessageName,其中 package.name 是為 package 陳述式提供的值,而非 java_package 陳述式。
inputSubscription 要讀取的 Pub/Sub 輸入訂閱項目。例如 projects/<project>/subscriptions/<subscription>
outputTopic 用於未處理記錄的 Pub/Sub 主題。例如 projects/<project-id>/topics/<topic-name>
outputTableSpec BigQuery 輸出資料表的位置。例如:my-project:my_dataset.my_table。視指定的 createDisposition 而定,系統可能會使用輸入結構定義檔自動建立輸出資料表。
preserveProtoFieldNames 選用:true,在 JSON 中保留原始 Proto 欄位名稱。false,即可使用更標準的 JSON 名稱。 舉例來說,false 會將 field_name 變更為 fieldName。(預設值:false)
bigQueryTableSchemaPath 選填:BigQuery 結構定義路徑的 Cloud Storage 路徑。例如,gs://path/to/my/schema.json。如未提供,系統會從 Proto 結構定義推斷結構定義。
javascriptTextTransformGcsPath 選用: 定義要使用的 JavaScript 使用者定義函式 (UDF) 的 .js 檔案 Cloud Storage URI。例如:gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName 選用: 您要使用的 JavaScript 使用者定義函式 (UDF) 名稱。舉例來說,如果您的 JavaScript 函式程式碼是 myTransform(inJson) { /*...do stuff...*/ },則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱「UDF 範例」。
javascriptTextTransformReloadIntervalMinutes 選用: 指定重新載入 UDF 的頻率 (以分鐘為單位)。如果值大於 0,Dataflow 會定期檢查 Cloud Storage 中的 UDF 檔案,並在檔案經過修改時重新載入 UDF。您可以在管道執行期間更新 UDF,不必重新啟動工作。如果值為 0,系統會停用 UDF 重新載入功能。預設值為 0。
udfOutputTopic 選用:儲存 UDF 錯誤的 Pub/Sub 主題。例如:projects/<project-id>/topics/<topic-name>。如未提供,系統會將 UDF 錯誤傳送至與 outputTopic 相同的主題。
writeDisposition 選用:BigQuery WriteDisposition。 例如 WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE。預設值:WRITE_APPEND
createDisposition 選用:BigQuery CreateDisposition。 例如:CREATE_IF_NEEDEDCREATE_NEVER。預設值:CREATE_IF_NEEDED
useStorageWriteApi 選用: 如果 true,管道會使用 BigQuery Storage Write API。預設值為 false。詳情請參閱「 使用 Storage Write API」。
useStorageWriteApiAtLeastOnce 選用:使用 Storage Write API 時,指定寫入語意。如要使用 「至少一次」語意,請將這個參數設為 true。如要使用「僅限一次」語意,請將參數設為 false。只有在 useStorageWriteApitrue 時,這項參數才會生效。預設值為 false
numStorageWriteApiStreams 選用: 使用 Storage Write API 時,指定寫入串流的數量。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。
storageWriteApiTriggeringFrequencySec 選用: 使用 Storage Write API 時,指定觸發頻率 (以秒為單位)。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。

使用者定義函式

您可以視需要撰寫使用者定義函式 (UDF) 來擴充這個範本。範本會針對每個輸入元素呼叫 UDF。元素酬載會序列化為 JSON 字串。詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

函式規格

UDF 的規格如下:

  • 輸入:Pub/Sub 訊息資料欄位,序列化為 JSON 字串。
  • 輸出:符合 BigQuery 目的地資料表結構定義的 JSON 字串。
  • 執行範本

    控制台

    1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
    2. 前往「依據範本建立工作」
    3. 在「工作名稱」欄位中,輸入專屬工作名稱。
    4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

      如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

    5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Pub/Sub Proto to BigQuery template。
    6. 在提供的參數欄位中輸入參數值。
    7. 按一下「Run Job」(執行工作)

    gcloud

    在殼層或終端機中執行範本:

    gcloud dataflow flex-template run JOB_NAME \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex \
        --parameters \
    schemaPath=SCHEMA_PATH,\
    fullMessageName=PROTO_MESSAGE_NAME,\
    inputSubscription=SUBSCRIPTION_NAME,\
    outputTableSpec=BIGQUERY_TABLE,\
    outputTopic=UNPROCESSED_TOPIC
      

    更改下列內容:

    • JOB_NAME: 您選擇的不重複工作名稱
    • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • SCHEMA_PATH:Proto 結構定義檔案的 Cloud Storage 路徑 (例如 gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME:Proto 訊息名稱 (例如 package.name.MessageName)
    • SUBSCRIPTION_NAME:Pub/Sub 輸入訂閱項目名稱
    • BIGQUERY_TABLE:BigQuery 輸出資料表名稱
    • UNPROCESSED_TOPIC:用於未處理佇列的 Pub/Sub 主題

    API

    如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_Proto_to_BigQuery_Flex",
          "parameters": {
              "schemaPath": "SCHEMA_PATH",
              "fullMessageName": "PROTO_MESSAGE_NAME",
              "inputSubscription": "SUBSCRIPTION_NAME",
              "outputTableSpec": "BIGQUERY_TABLE",
              "outputTopic": "UNPROCESSED_TOPIC"
          }
       }
    }
      

    更改下列內容:

    • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
    • JOB_NAME: 您選擇的不重複工作名稱
    • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • SCHEMA_PATH:Proto 結構定義檔案的 Cloud Storage 路徑 (例如 gs://MyBucket/file.pb)
    • PROTO_MESSAGE_NAME:Proto 訊息名稱 (例如 package.name.MessageName)
    • SUBSCRIPTION_NAME:Pub/Sub 輸入訂閱項目名稱
    • BIGQUERY_TABLE:BigQuery 輸出資料表名稱
    • UNPROCESSED_TOPIC:用於未處理佇列的 Pub/Sub 主題

    後續步驟