Pub/Sub 到 BigQuery (含 Python UDF) 範本

「Pub/Sub 到 BigQuery (含 Python UDF)」範本是一個串流管道,可從 Pub/Sub 讀取 JSON 格式的訊息,並將這些訊息寫入 BigQuery 資料表。您可以視需要提供以 Python 編寫的使用者定義函式 (UDF),處理傳入的訊息。

管道相關規定

  • BigQuery 資料表必須存在且具備結構定義。
  • Pub/Sub 訊息資料必須使用 JSON 格式,否則您必須提供可將訊息資料轉換為 JSON 的 UDF。JSON 資料必須符合 BigQuery 資料表結構定義。舉例來說,如果 JSON 酬載的格式為 {"k1":"v1", "k2":"v2"},BigQuery 資料表就必須有兩個名為 k1k2 的字串欄。
  • 指定 inputSubscriptioninputTopic 參數,但不能兩者同時指定。

範本參數

參數 說明
outputTableSpec 要寫入的 BigQuery 資料表,格式為 "PROJECT_ID:DATASET_NAME.TABLE_NAME"
inputSubscription 選用:要讀取的 Pub/Sub 訂閱,格式為 "projects/PROJECT_ID/subscriptions/SUBCRIPTION_NAME"
inputTopic 選用:要讀取的 Pub/Sub 主題,格式為 "projects/PROJECT_ID/topics/TOPIC_NAME"
outputDeadletterTable 無法到達輸出資料表的訊息所屬 BigQuery 資料表,格式為 "PROJECT_ID:DATASET_NAME.TABLE_NAME"。如果資料表不存在,系統會在管道執行時建立。如未指定此參數,則會改用 "OUTPUT_TABLE_SPEC_error_records" 值。
pythonExternalTextTransformGcsPath 選用: 定義要使用的使用者定義函式 (UDF) 的 Python 程式碼檔案 Cloud Storage URI。例如:gs://my-bucket/my-udfs/my_file.py
pythonExternalTextTransformFunctionName 選用: 要使用的 Python 使用者定義函式 (UDF) 名稱。
useStorageWriteApi 選用: 如果 true,管道會使用 BigQuery Storage Write API。預設值為 false。詳情請參閱「 使用 Storage Write API」。
useStorageWriteApiAtLeastOnce 選用:使用 Storage Write API 時,指定寫入語意。如要使用 「至少一次」語意,請將這個參數設為 true。如要使用「僅限一次」語意,請將參數設為 false。只有在 useStorageWriteApitrue 時,這項參數才會生效。預設值為 false
numStorageWriteApiStreams 選用: 使用 Storage Write API 時,指定寫入串流的數量。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。
storageWriteApiTriggeringFrequencySec 選用: 使用 Storage Write API 時,指定觸發頻率 (以秒為單位)。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。

使用者定義函式

您可以視需要撰寫使用者定義函式 (UDF) 來擴充這個範本。範本會針對每個輸入元素呼叫 UDF。元素酬載會序列化為 JSON 字串。詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

函式規格

UDF 的規格如下:

  • 輸入:Pub/Sub 訊息資料欄位,序列化為 JSON 字串。
  • 輸出:符合 BigQuery 目的地資料表結構定義的 JSON 字串。
  • 執行範本

    控制台

    1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
    2. 前往「依據範本建立工作」
    3. 在「工作名稱」欄位中,輸入專屬工作名稱。
    4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

      如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

    5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Pub/Sub to BigQuery with Python UDF template。
    6. 在提供的參數欄位中輸入參數值。
    7. 選用:如要從「僅需處理一次」切換至「至少一次」串流模式,請選取「至少一次」
    8. 按一下「Run Job」(執行工作)

    gcloud

    在殼層或終端機中執行範本:

    gcloud dataflow flex-template run JOB_NAME \
        --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_BigQuery_Xlang \
        --region REGION_NAME \
        --staging-location STAGING_LOCATION \
        --parameters \
    inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
    outputTableSpec=PROJECT_ID:DATASET.TABLE_NAME

    更改下列內容:

    • JOB_NAME: 您選擇的不重複工作名稱
    • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • STAGING_LOCATION:用於暫存本機檔案的位置 (例如 gs://your-bucket/staging)
    • TOPIC_NAME:您的 Pub/Sub 主題名稱
    • DATASET:您的 BigQuery 資料集
    • TABLE_NAME:BigQuery 資料表名稱

    API

    如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
           "inputTopic": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME",
           "outputTableSpec": "PROJECT_ID:DATASET.TABLE_NAME"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_BigQuery_Xlang",
       }
    }

    更改下列內容:

    • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
    • JOB_NAME: 您選擇的不重複工作名稱
    • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • STAGING_LOCATION:用於暫存本機檔案的位置 (例如 gs://your-bucket/staging)
    • TOPIC_NAME:您的 Pub/Sub 主題名稱
    • DATASET:您的 BigQuery 資料集
    • TABLE_NAME:BigQuery 資料表名稱

    後續步驟