Datastream 到 BigQuery (串流) 範本

「將 Datastream 內容串流至 BigQuery」範本是一種串流管道,可讀取 Datastream 資料並複製到 BigQuery。範本會使用 Pub/Sub 通知從 Cloud Storage 讀取資料,並將資料複製到以時間為依據進行分割的 BigQuery 暫存資料表。複製完成後,範本會在 BigQuery 中執行 MERGE,將所有異動資料擷取 (CDC) 變更插入來源資料表的副本。指定 gcsPubSubSubscription 參數,從 Pub/Sub 通知讀取資料,或提供 inputFilePattern 參數,直接從 Cloud Storage 中的檔案讀取資料。

範本會處理複寫作業管理的 BigQuery 資料表建立和更新作業。需要資料定義語言 (DDL) 時,系統會回呼 Datastream,擷取來源資料表結構定義並轉換為 BigQuery 資料類型。支援的作業包括:

  • 插入資料時,系統會建立新資料表。
  • 系統會將新資料欄新增至 BigQuery 資料表,並將初始值設為空值。
  • BigQuery 會忽略已捨棄的資料欄,且未來值為空值。
  • 系統會將重新命名的資料欄新增至 BigQuery。
  • 類型變更不會傳播至 BigQuery。

建議使用至少一次串流模式執行此管道,因為範本會在將資料從臨時 BigQuery 資料表合併至主要 BigQuery 資料表時,執行重複資料刪除作業。管線中的這個步驟表示,使用「只傳送一次」串流模式不會帶來額外好處。

管道相關規定

  • 準備好或已複製資料的 Datastream 串流。
  • 為 Datastream 資料啟用 Cloud Storage Pub/Sub 通知
  • 已建立 BigQuery 目的地資料集,且 Compute Engine 服務帳戶已獲授管理員存取權。
  • 來源資料表必須有主鍵,才能建立目的地副本資料表。
  • MySQL 或 Oracle 來源資料庫。不支援 PostgreSQL 和 SQL Server 資料庫。

範本參數

必要參數

  • inputFilePattern:Cloud Storage 中 Datastream 檔案輸出的檔案位置,格式為 gs://<BUCKET_NAME>/<ROOT_PATH>/
  • inputFileFormat:Datastream 產生的輸出檔案格式。允許的值為 avrojson。預設值為 avro
  • gcsPubSubSubscription:Cloud Storage 用於通知 Dataflow 有新檔案可供處理的 Pub/Sub 訂閱項目,格式為 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
  • outputStagingDatasetTemplate:包含暫存資料表的資料集名稱。這個參數支援範本,例如 {_metadata_dataset}_logmy_dataset_log。這個參數通常是資料集名稱。預設值為 {_metadata_dataset}。注意:如果是 MySQL 來源,資料庫名稱會對應至 {_metadata_schema},而非 {_metadata_dataset}
  • outputDatasetTemplate:包含副本資料表的資料集名稱。這個參數支援範本,例如 {_metadata_dataset}my_dataset。這個參數通常是資料集名稱。預設值為 {_metadata_dataset}。注意:如果是 MySQL 來源,資料庫名稱會對應至 {_metadata_schema},而非 {_metadata_dataset}
  • deadLetterQueueDirectory:Dataflow 用來寫入無法傳送郵件佇列輸出的路徑。這個路徑不得與 Datastream 檔案輸出路徑相同。預設值為 empty

選用參數

  • streamName:要輪詢結構定義資訊的串流名稱或範本。預設值為 {_metadata_stream}。預設值通常就已足夠。
  • rfcStartDateTime:從 Cloud Storage 擷取資料時使用的開始日期時間 (https://tools.ietf.org/html/rfc3339)。預設值為 1970-01-01T00:00:00.00Z
  • fileReadConcurrency:要讀取的並行 DataStream 檔案數量。預設值為 10
  • outputProjectId:Google Cloud 專案的 ID,其中包含要輸出資料的 BigQuery 資料集。這項參數的預設值是執行 Dataflow 管道的專案。
  • outputStagingTableNameTemplate:用於命名暫存資料表的範本。例如,{_metadata_table}。預設值為 {_metadata_table}_log
  • outputTableNameTemplate:用於副本資料表名稱的範本,例如 {_metadata_table}。預設值為 {_metadata_table}
  • ignoreFields:以半形逗號分隔的欄位,在 BigQuery 中會遭到忽略。預設值為 _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count。例如:_metadata_stream,_metadata_schema
  • mergeFrequencyMinutes:指定資料表合併作業之間的分鐘數。預設值為 5
  • dlqRetryMinutes:重試 DLQ 的間隔分鐘數。預設值為 10
  • dataStreamRootUrl:Datastream API 根網址。預設值為:https://datastream.googleapis.com/
  • applyMerge:是否要為工作停用 MERGE 查詢。預設值為 true
  • mergeConcurrency:並行 BigQuery MERGE 查詢的數量。只有在 applyMerge 設為 true 時才會生效。預設值為 30
  • partitionRetentionDays:執行 BigQuery 合併作業時,用於保留分區的天數。預設值為 1
  • useStorageWriteApiAtLeastOnce:只有在啟用 Use BigQuery Storage Write API 時,這個參數才會生效。如果 true,則 Storage Write API 會使用至少一次語意。否則會使用「僅限一次」語意。預設值為 false
  • javascriptTextTransformGcsPath:定義要使用的 JavaScript 使用者定義函式 (UDF) 的 .js 檔案 Cloud Storage URI。例如:gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 使用者定義函式 (UDF) 名稱。舉例來說,如果您的 JavaScript 函式程式碼是 myTransform(inJson) { /*...do stuff...*/ },則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱 UDF 範例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:指定重新載入 UDF 的頻率 (以分鐘為單位)。如果值大於 0,Dataflow 會定期檢查 Cloud Storage 中的 UDF 檔案,並在檔案經過修改時重新載入 UDF。您可以在管道執行期間更新 UDF,不必重新啟動工作。如果值為 0,系統會停用 UDF 重新載入功能。預設值為 0
  • pythonTextTransformGcsPath:包含使用者定義函式的 Python 程式碼的 Cloud Storage 路徑模式。例如:gs://your-bucket/your-transforms/*.py
  • pythonRuntimeVersion:這個 Python UDF 要使用的執行階段版本。
  • pythonTextTransformFunctionName:要從 JavaScript 檔案呼叫的函式名稱。只能使用英文字母、數字和底線,例如:transform_udf1
  • runtimeRetries:執行階段重試次數上限。預設值為 5。
  • useStorageWriteApi:如果為 true,管道會使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。預設值為 false。詳情請參閱「使用 Storage Write API」(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。
  • numStorageWriteApiStreams:使用 Storage Write API 時,指定寫入串流的數量。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。預設值為 0。
  • storageWriteApiTriggeringFrequencySec:使用 Storage Write API 時,指定觸發頻率 (以秒為單位)。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。

使用者定義函式

您可以視需要撰寫使用者定義函式 (UDF) 來擴充這個範本。範本會針對每個輸入元素呼叫 UDF。元素酬載會序列化為 JSON 字串。詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

函式規格

UDF 的規格如下:

  • 輸入:以 JSON 字串序列化的 CDC 資料。
  • 輸出:符合 BigQuery 目的地資料表結構定義的 JSON 字串。
  • 執行範本

    控制台

    1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
    2. 前往「依據範本建立工作」
    3. 在「工作名稱」欄位中,輸入專屬工作名稱。
    4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

      如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

    5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Datastream to BigQuery template。
    6. 在提供的參數欄位中輸入參數值。
    7. 選用:如要從「僅需處理一次」切換至「至少一次」串流模式,請選取「至少一次」
    8. 按一下「Run Job」(執行工作)

    gcloud

    在殼層或終端機中執行範本:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    更改下列內容:

    • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
    • JOB_NAME: 您選擇的不重複工作名稱
    • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH:Datastream 資料的 Cloud Storage 路徑。例如:gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME:要從中讀取變更檔案的 Pub/Sub 訂閱項目。例如:projects/my-project-id/subscriptions/my-subscription-id
    • :您的 BigQuery 資料集名稱。BIGQUERY_DATASET
    • BIGQUERY_TABLE:BigQuery 資料表範本。例如:{_metadata_schema}_{_metadata_table}_log

    API

    如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    更改下列內容:

    • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
    • JOB_NAME: 您選擇的不重複工作名稱
    • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH:Datastream 資料的 Cloud Storage 路徑。例如:gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME:要從中讀取變更檔案的 Pub/Sub 訂閱項目。例如:projects/my-project-id/subscriptions/my-subscription-id
    • :您的 BigQuery 資料集名稱。BIGQUERY_DATASET
    • BIGQUERY_TABLE:BigQuery 資料表範本。例如:{_metadata_schema}_{_metadata_table}_log

    後續步驟