這個範本會建立批次管道,從 MongoDB 讀取文件並寫入 BigQuery。
如要擷取 MongoDB 變更串流資料,可以使用 MongoDB 到 BigQuery (CDC) 範本。
管道相關規定
- 目標 BigQuery 資料集必須存在。
- Dataflow 工作站機器必須能夠存取來源 MongoDB 執行個體。
輸出格式
輸出記錄的格式取決於 userOption
參數的值。如果 userOption
是 NONE
,輸出內容會採用下列結構定義。source_data
欄位包含 JSON 格式的文件。
[ {"name":"id","type":"STRING"}, {"name":"source_data","type":"STRING"}, {"name":"timestamp","type":"TIMESTAMP"} ]
如果 userOption
為 FLATTEN
,管道會將文件扁平化,並將頂層欄位寫入為資料表欄。舉例來說,假設 MongoDB 集合中的文件包含下列欄位:
"_id"
(string
)"title"
(string
)"genre"
(string
)
使用 FLATTEN
時,輸出內容會採用下列結構定義。範本會新增 timestamp
欄位。
[ {"name":"_id","type":"STRING"}, {"name":"title","type":"STRING"}, {"name":"genre","type":"STRING"}, {"name":"timestamp","type":"TIMESTAMP"} ]
如果 userOption
為 JSON
,管道會以 BigQuery JSON 格式儲存文件。BigQuery 內建支援使用 JSON 資料型別的 JSON 資料。詳情請參閱在 GoogleSQL 中使用 JSON 資料。
範本參數
必要參數
- mongoDbUri:MongoDB 連線 URI,格式為
mongodb+srv://:@.
。 - 資料庫:要從中讀取集合的 MongoDB 資料庫。例如:
my-db
。 - collection:MongoDB 資料庫中的集合名稱。例如:
my-collection
。 - userOption:
FLATTEN
、JSON
或NONE
。FLATTEN
會將文件扁平化為單一層級。JSON
會以 BigQuery JSON 格式儲存文件。NONE
會將整份文件儲存為 JSON 格式的字串。預設值為 NONE。 - outputTableSpec:要寫入的 BigQuery 資料表。例如:
bigquery-project:dataset.output_table
。
選用參數
- KMSEncryptionKey:可解密 MongoDB URI 連線字串的 Cloud KMS 加密金鑰。如果傳入 Cloud KMS 金鑰,則必須以加密方式傳送 MongoDB URI 連線字串。例如:
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
。 - filter:JSON 格式的 Bson 篩選器。例如:
{ "val": { $gt: 0, $lt: 9 }}
。 - useStorageWriteApi:如果為
true
,管道會使用 BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api)。預設值為false
。詳情請參閱「使用 Storage Write API」(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)。 - useStorageWriteApiAtLeastOnce:使用 Storage Write API 時,指定寫入語意。如要使用「至少一次」語意 (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics),請將這個參數設為
true
。如要使用「僅限一次」語意,請將參數設為false
。只有在useStorageWriteApi
為true
時,這項參數才會生效。預設值為false
。 - bigQuerySchemaPath:BigQuery JSON 結構定義的 Cloud Storage 路徑。例如:
gs://your-bucket/your-schema.json
。 - javascriptDocumentTransformGcsPath:定義要使用的 JavaScript 使用者定義函式 (UDF) 的
.js
檔案 Cloud Storage URI。例如:gs://your-bucket/your-transforms/*.js
。 - javascriptDocumentTransformFunctionName:要使用的 JavaScript 使用者定義函式 (UDF) 名稱。舉例來說,如果您的 JavaScript 函式程式碼是
myTransform(inJson) { /*...do stuff...*/ }
,則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱 UDF 範例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。例如:transform
。
使用者定義函式
您也可以選擇以 JavaScript 編寫使用者定義函式 (UDF) 來擴充這個範本。範本會針對每個輸入元素呼叫 UDF。 元素酬載會序列化為 JSON 字串。
如要使用 UDF,請將 JavaScript 檔案上傳至 Cloud Storage,並設定下列範本參數:
參數 | 說明 |
---|---|
javascriptDocumentTransformGcsPath |
JavaScript 檔案的 Cloud Storage 位置。 |
javascriptDocumentTransformFunctionName |
JavaScript 函式的名稱。 |
詳情請參閱「為 Dataflow 範本建立使用者定義函式」。
函式規格
UDF 的規格如下:
userOption
為 NONE
,JSON 物件必須包含名為 _id
的屬性,其中包含文件 ID。執行範本
控制台
- 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。 前往「依據範本建立工作」
- 在「工作名稱」欄位中,輸入專屬工作名稱。
- 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為
us-central1
。如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。
- 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the MongoDB to BigQuery template。
- 在提供的參數欄位中輸入參數值。
- 按一下「Run Job」(執行工作)。
gcloud
在殼層或終端機中執行範本:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \ --parameters \ outputTableSpec=OUTPUT_TABLE_SPEC,\ mongoDbUri=MONGO_DB_URI,\ database=DATABASE,\ collection=COLLECTION,\ userOption=USER_OPTION
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱REGION_NAME
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC
:目標 BigQuery 資料表名稱。MONGO_DB_URI
:您的 MongoDB URI。DATABASE
:您的 MongoDB 資料庫。COLLECTION
:您的 MongoDB 集合。USER_OPTION
:FLATTEN、JSON 或 NONE。
API
如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "mongoDbUri": "MONGO_DB_URI", "database": "DATABASE", "collection": "COLLECTION", "userOption": "USER_OPTION" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery", } }
更改下列內容:
PROJECT_ID
: 您要執行 Dataflow 工作的專案 ID Google CloudJOB_NAME
: 您選擇的不重複工作名稱LOCATION
: 您要部署 Dataflow 工作的地區,例如us-central1
VERSION
: 您要使用的範本版本您可以使用下列值:
latest
,使用範本的最新版本,該版本位於值區中非依日期命名的上層資料夾:gs://dataflow-templates-REGION_NAME/latest/- 版本名稱 (例如
2023-09-12-00_RC00
),用於指定範本版本,該版本會以巢狀結構存放在值區中依日期命名的上層資料夾中:gs://dataflow-templates-REGION_NAME/
OUTPUT_TABLE_SPEC
:目標 BigQuery 資料表名稱。MONGO_DB_URI
:您的 MongoDB URI。DATABASE
:您的 MongoDB 資料庫。COLLECTION
:您的 MongoDB 集合。USER_OPTION
:FLATTEN、JSON 或 NONE。
後續步驟
- 瞭解 Dataflow 範本。
- 請參閱 Google 提供的範本清單。