Cloud Storage Text 到 BigQuery (串流) 範本

Cloud Storage Text 到 BigQuery 管道為串流管道,可串流儲存於 Cloud Storage 的文字檔案,使用您提供的 JavaScript 使用者定義函式 (UDF) 進行轉換,然後將結果附加至 BigQuery。

管道會無限期持續執行,直到透過取消 (而非排除) 手動終止。這是因為管道使用 Watch 轉換,而 Watch 是可分割的 DoFn,不支援排除。

管道相關規定

  • 建立 JSON 檔案,說明 BigQuery 中輸出資料表的結構定義。

    確保有一個標題為 fields 的最高層級 JSON 陣列,且其中的內容遵循 {"name": "COLUMN_NAME", "type": "DATA_TYPE"} 這個格式。例如:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • 建立一個 JavaScript (.js) 檔案,其中包含提供文字行轉換邏輯的 UDF 函式。函式必須傳回 JSON 字串。

    以下範例會將 CSV 檔案中的每一行都拆開,使用這些值建立 JSON 物件,然後傳回 JSON 字串:

    function process(inJson) {
      val = inJson.split(",");
    
      const obj = {
        "name": val[0],
        "age": parseInt(val[1])
      };
      return JSON.stringify(obj);
    }

範本參數

必要參數

  • inputFilePattern:Cloud Storage 中您要處理的文字的 gs:// 路徑,例如:gs://your-bucket/your-file.txt
  • JSONPath:JSON 檔案 (用來描述您儲存在 Cloud Storage 中 BigQuery 的結構定義) 的 gs:// 路徑,例如:gs://your-bucket/your-schema.json
  • outputTable:用來儲存已處理資料的 BigQuery 資料表位置。如果重複使用現有資料表,系統會覆寫該資料表。例如:<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
  • javascriptTextTransformGcsPath:定義要使用的 JavaScript 使用者定義函式 (UDF) 的 .js 檔案 Cloud Storage URI。例如:gs://your-bucket/your-transforms/*.js
  • javascriptTextTransformFunctionName:您要使用的 JavaScript 使用者定義函式 (UDF) 名稱。舉例來說,如果您的 JavaScript 函式程式碼是 myTransform(inJson) { /*...do stuff...*/ },則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱 UDF 範例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。例如:transform_udf1
  • bigQueryLoadingTemporaryDirectory:BigQuery 載入程序的暫存目錄。例如:gs://your-bucket/your-files/temp-dir

選用參數

  • outputDeadletterTable:無法到達輸出資料表的訊息資料表。如果資料表不存在,系統會在管道執行期間建立。如未指定,則會使用 <outputTableSpec>_error_records。例如:<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
  • useStorageWriteApiAtLeastOnce:只有在啟用 Use BigQuery Storage Write API 時,這個參數才會生效。如果啟用,系統會對 Storage Write API 使用至少一次語意,否則會使用單次語意。預設值為 false。
  • useStorageWriteApi:如果為 true,管道會使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。預設值為 false。詳情請參閱「使用 Storage Write API」(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。
  • numStorageWriteApiStreams:使用 Storage Write API 時,指定寫入串流的數量。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。預設值為 0。
  • storageWriteApiTriggeringFrequencySec:使用 Storage Write API 時,指定觸發頻率 (以秒為單位)。如果 useStorageWriteApitrue,且 useStorageWriteApiAtLeastOncefalse,則必須設定這個參數。
  • pythonExternalTextTransformGcsPath:包含使用者定義函式的 Python 程式碼的 Cloud Storage 路徑模式。例如:gs://your-bucket/your-function.py
  • javascriptTextTransformReloadIntervalMinutes:指定重新載入 UDF 的頻率 (以分鐘為單位)。如果值大於 0,Dataflow 會定期檢查 Cloud Storage 中的 UDF 檔案,並在檔案經過修改時重新載入 UDF。您可以在管道執行期間更新 UDF,不必重新啟動工作。如果值為 0,系統會停用 UDF 重新載入功能。預設值為 0

使用者定義函式

這個範本需要 UDF 來剖析輸入檔案,詳情請參閱「管道需求」。範本會針對每個輸入檔案中的每一行文字呼叫 UDF。如要進一步瞭解如何建立 UDF,請參閱「為 Dataflow 範本建立使用者定義函式」。

函式規格

UDF 的規格如下:

  • 輸入:輸入檔案中的單行文字。
  • 輸出:符合 BigQuery 目的地資料表結構定義的 JSON 字串。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Cloud Storage Text to BigQuery (Stream) template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

更改下列內容:

  • JOB_NAME: 您選擇的不重複工作名稱
  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • STAGING_LOCATION:用於暫存本機檔案的位置 (例如 gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 使用者定義函式 (UDF) 名稱

    舉例來說,如果您的 JavaScript 函式程式碼是 myTransform(inJson) { /*...do stuff...*/ },則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱「UDF 範例」。

  • PATH_TO_BIGQUERY_SCHEMA_JSON:指向 JSON 檔案 (內含結構定義) 的 Cloud Storage 路徑
  • PATH_TO_JAVASCRIPT_UDF_FILE: 定義要使用的 JavaScript 使用者定義函式 (UDF) 的 .js 檔案 Cloud Storage URI,例如 gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA:文字資料集的 Cloud Storage 路徑
  • BIGQUERY_TABLE:BigQuery 資料表名稱
  • BIGQUERY_UNPROCESSED_TABLE:未處理訊息的 BigQuery 資料表名稱
  • PATH_TO_TEMP_DIR_ON_GCS:暫時目錄的 Cloud Storage 路徑

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex",
   }
}

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • JOB_NAME: 您選擇的不重複工作名稱
  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • STAGING_LOCATION:用於暫存本機檔案的位置 (例如 gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 使用者定義函式 (UDF) 名稱

    舉例來說,如果您的 JavaScript 函式程式碼是 myTransform(inJson) { /*...do stuff...*/ },則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱「UDF 範例」。

  • PATH_TO_BIGQUERY_SCHEMA_JSON:指向 JSON 檔案 (內含結構定義) 的 Cloud Storage 路徑
  • PATH_TO_JAVASCRIPT_UDF_FILE: 定義要使用的 JavaScript 使用者定義函式 (UDF) 的 .js 檔案 Cloud Storage URI,例如 gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA:文字資料集的 Cloud Storage 路徑
  • BIGQUERY_TABLE:BigQuery 資料表名稱
  • BIGQUERY_UNPROCESSED_TABLE:未處理訊息的 BigQuery 資料表名稱
  • PATH_TO_TEMP_DIR_ON_GCS:暫時目錄的 Cloud Storage 路徑

後續步驟