MongoDB 到 BigQuery 範本

這個範本會建立批次管道,從 MongoDB 讀取文件並寫入 BigQuery。

如要擷取 MongoDB 變更串流資料,可以使用 MongoDB 到 BigQuery (CDC) 範本

管道相關規定

  • 目標 BigQuery 資料集必須存在。
  • Dataflow 工作站機器必須能夠存取來源 MongoDB 執行個體。

輸出格式

輸出記錄的格式取決於 userOption 參數的值。如果 userOptionNONE,輸出內容會採用下列結構定義。source_data 欄位包含 JSON 格式的文件。

  [
    {"name":"id","type":"STRING"},
    {"name":"source_data","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

如果 userOptionFLATTEN,管道會將文件扁平化,並將頂層欄位寫入為資料表欄。舉例來說,假設 MongoDB 集合中的文件包含下列欄位:

  • "_id" (string)
  • "title" (string)
  • "genre" (string)

使用 FLATTEN 時,輸出內容會採用下列結構定義。範本會新增 timestamp 欄位。

  [
    {"name":"_id","type":"STRING"},
    {"name":"title","type":"STRING"},
    {"name":"genre","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

如果 userOptionJSON,管道會以 BigQuery JSON 格式儲存文件。BigQuery 內建支援使用 JSON 資料型別的 JSON 資料。詳情請參閱在 GoogleSQL 中使用 JSON 資料

範本參數

必要參數

  • mongoDbUri:MongoDB 連線 URI,格式為 mongodb+srv://:@.
  • 資料庫:要從中讀取集合的 MongoDB 資料庫。例如:my-db
  • collection:MongoDB 資料庫中的集合名稱。例如:my-collection
  • userOptionFLATTENJSONNONEFLATTEN 會將文件扁平化為單一層級。JSON 會以 BigQuery JSON 格式儲存文件。NONE 會將整份文件儲存為 JSON 格式的字串。預設值為 NONE。
  • outputTableSpec:要寫入的 BigQuery 資料表。例如:bigquery-project:dataset.output_table

選用參數

  • KMSEncryptionKey:可解密 MongoDB URI 連線字串的 Cloud KMS 加密金鑰。如果傳入 Cloud KMS 金鑰,則必須以加密方式傳送 MongoDB URI 連線字串。例如:projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
  • filter:JSON 格式的 Bson 篩選器。例如:{ "val": { $gt: 0, $lt: 9 }}
  • useStorageWriteApi:如果為 true,管道會使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。預設值為 false。詳情請參閱「使用 Storage Write API」(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。
  • useStorageWriteApiAtLeastOnce:使用 Storage Write API 時,指定寫入語意。如要使用「至少一次」語意 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),請將這個參數設為 true。如要使用「僅限一次」語意,請將參數設為 false。只有在 useStorageWriteApitrue 時,這項參數才會生效。預設值為 false
  • bigQuerySchemaPath:BigQuery JSON 結構定義的 Cloud Storage 路徑。例如:gs://your-bucket/your-schema.json
  • javascriptDocumentTransformGcsPath:定義要使用的 JavaScript 使用者定義函式 (UDF) 的 .js 檔案 Cloud Storage URI。例如:gs://your-bucket/your-transforms/*.js
  • javascriptDocumentTransformFunctionName:要使用的 JavaScript 使用者定義函式 (UDF) 名稱。舉例來說,如果您的 JavaScript 函式程式碼是 myTransform(inJson) { /*...do stuff...*/ },則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱 UDF 範例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。例如:transform

使用者定義函式

您也可以選擇以 JavaScript 編寫使用者定義函式 (UDF) 來擴充這個範本。範本會針對每個輸入元素呼叫 UDF。 元素酬載會序列化為 JSON 字串。

如要使用 UDF,請將 JavaScript 檔案上傳至 Cloud Storage,並設定下列範本參數:

參數說明
javascriptDocumentTransformGcsPath JavaScript 檔案的 Cloud Storage 位置。
javascriptDocumentTransformFunctionName JavaScript 函式的名稱。

詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

函式規格

UDF 的規格如下:

  • 輸入:MongoDB 文件。
  • 輸出:序列化為 JSON 字串的物件。如果 userOptionNONE,JSON 物件必須包含名為 _id 的屬性,其中包含文件 ID。
  • 執行範本

    控制台

    1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
    2. 前往「依據範本建立工作」
    3. 在「工作名稱」欄位中,輸入專屬工作名稱。
    4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

      如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

    5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the MongoDB to BigQuery template。
    6. 在提供的參數欄位中輸入參數值。
    7. 按一下「Run Job」(執行工作)

    gcloud

    在殼層或終端機中執行範本:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION

    更改下列內容:

    • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
    • JOB_NAME: 您選擇的不重複工作名稱
    • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • OUTPUT_TABLE_SPEC:目標 BigQuery 資料表名稱。
    • MONGO_DB_URI:您的 MongoDB URI。
    • DATABASE:您的 MongoDB 資料庫。
    • COLLECTION:您的 MongoDB 集合。
    • USER_OPTION:FLATTEN、JSON 或 NONE。

    API

    如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery",
       }
    }

    更改下列內容:

    • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
    • JOB_NAME: 您選擇的不重複工作名稱
    • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
    • VERSION: 您要使用的範本版本

      您可以使用下列值:

    • OUTPUT_TABLE_SPEC:目標 BigQuery 資料表名稱。
    • MONGO_DB_URI:您的 MongoDB URI。
    • DATABASE:您的 MongoDB 資料庫。
    • COLLECTION:您的 MongoDB 集合。
    • USER_OPTION:FLATTEN、JSON 或 NONE。

    後續步驟