Cloud Storage Text 到 BigQuery (串流) (含 Python UDF) 範本

Cloud Storage Text 到 BigQuery 管道為串流管道,可串流儲存於 Cloud Storage 的文字檔案,使用您提供的 Python 使用者定義函式 (UDF) 進行轉換,然後將結果附加至 BigQuery。

管道會無限期持續執行,直到透過取消 (而非排除) 手動終止。這是因為管道使用 Watch 轉換,而 Watch 是可分割的 DoFn,不支援排除。

管道相關規定

  • 建立 JSON 檔案,說明 BigQuery 中輸出資料表的結構定義。

    確保有一個標題為 fields 的最高層級 JSON 陣列,且其中的內容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 這個格式。例如:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • 建立一個 Python (.py) 檔案,其中包含提供文字行轉換邏輯的 UDF 函式。函式必須傳回 JSON 字串。

    以下範例會將 CSV 檔案中的每一行都拆開,使用這些值建立 JSON 物件,然後傳回 JSON 字串:

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

範本參數

參數 說明
pythonExternalTextTransformGcsPath 定義要使用的使用者定義函式 (UDF) 的 Python 程式碼檔案 Cloud Storage URI。例如:gs://my-bucket/my-udfs/my_file.py
pythonExternalTextTransformFunctionName 要使用的 Python 使用者定義函式 (UDF) 名稱。
JSONPath BigQuery 結構定義檔案的 Cloud Storage 位置,以 JSON 描述。例如 gs://path/to/my/schema.json
outputTable 完全合格的 BigQuery 資料表。 例如:my-project:dataset.table
inputFilePattern 要處理文字的 Cloud Storage 位置。 例如 gs://my-bucket/my-files/text.txt
bigQueryLoadingTemporaryDirectory BigQuery 載入程序的暫存目錄,例如:gs://my-bucket/my-files/temp_dir
outputDeadletterTable 無法到達輸出資料表的訊息資料表。 例如:my-project:dataset.my-unprocessed-table。如果該資料表不存在,將會於管道執行期間建立。 如果未指定,則使用 <outputTableSpec>_error_records

使用者定義函式

這個範本需要 UDF 來剖析輸入檔案,詳情請參閱「管道需求」。範本會針對每個輸入檔案中的每一行文字呼叫 UDF。如要進一步瞭解如何建立 UDF,請參閱「為 Dataflow 範本建立使用者定義函式」。

函式規格

UDF 的規格如下:

  • 輸入:輸入檔案中的單行文字。
  • 輸出:符合 BigQuery 目的地資料表結構定義的 JSON 字串。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Cloud Storage Text to BigQuery (Stream) with Python UDF template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
pythonExternalTextTransformGcsPath=PATH_TO_PYTHON_UDF_FILE,\
pythonExternalTextTransformFunctionName=PYTHON_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

更改下列內容:

  • JOB_NAME: 您選擇的不重複工作名稱
  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • STAGING_LOCATION:用於暫存本機檔案的位置 (例如 gs://your-bucket/staging)
  • PYTHON_FUNCTION: 您要使用的 Python 使用者定義函式 (UDF) 名稱。
  • PATH_TO_BIGQUERY_SCHEMA_JSON:指向 JSON 檔案 (內含結構定義) 的 Cloud Storage 路徑
  • PATH_TO_PYTHON_UDF_FILE: 定義要使用的使用者定義函式 (UDF) 的 Python 程式碼檔案 Cloud Storage URI。例如:gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA:文字資料集的 Cloud Storage 路徑
  • BIGQUERY_TABLE:BigQuery 資料表名稱
  • BIGQUERY_UNPROCESSED_TABLE:未處理訊息的 BigQuery 資料表名稱
  • PATH_TO_TEMP_DIR_ON_GCS:暫時目錄的 Cloud Storage 路徑

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Xlang",
   }
}

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • STAGING_LOCATION:用於暫存本機檔案的位置 (例如 gs://your-bucket/staging)
  • PYTHON_FUNCTION: 您要使用的 Python 使用者定義函式 (UDF) 名稱。
  • PATH_TO_BIGQUERY_SCHEMA_JSON:指向 JSON 檔案 (內含結構定義) 的 Cloud Storage 路徑
  • PATH_TO_PYTHON_UDF_FILE: 定義要使用的使用者定義函式 (UDF) 的 Python 程式碼檔案 Cloud Storage URI。例如:gs://my-bucket/my-udfs/my_file.py
  • PATH_TO_TEXT_DATA:文字資料集的 Cloud Storage 路徑
  • BIGQUERY_TABLE:BigQuery 資料表名稱
  • BIGQUERY_UNPROCESSED_TABLE:未處理訊息的 BigQuery 資料表名稱
  • PATH_TO_TEMP_DIR_ON_GCS:暫時目錄的 Cloud Storage 路徑

後續步驟