Pub/Sub 到 MongoDB 範本

「Pub/Sub 到 MongoDB 範本」是一個串流管道,可從 Pub/Sub 訂閱項目讀取 JSON 編碼的訊息,並以文件形式寫入 MongoDB。如有需要,這個管道支援其他轉換作業,您可以使用 JavaScript 使用者定義函式 (UDF) 納入這些轉換。

如果處理記錄時發生錯誤,範本會將錯誤連同輸入訊息一併寫入 BigQuery 資料表。舉例來說,如果結構定義不相符、JSON 格式不正確,或是在執行轉換作業時發生錯誤,在 deadletterTable 參數中指定資料表名稱。如果資料表不存在,管道會自動建立。

管道相關規定

  • Pub/Sub 訂閱項目必須存在,且訊息必須以有效的 JSON 格式編碼。
  • MongoDB 叢集必須存在,且應能透過 Dataflow 工作站機器存取。

範本參數

必要參數

  • inputSubscription:Pub/Sub 訂閱項目的名稱。例如:projects/your-project-id/subscriptions/your-subscription-name
  • mongoDBUri:以半形逗號分隔的 MongoDB 伺服器清單。例如:host1:port,host2:port,host3:port
  • 資料庫:MongoDB 中用於儲存集合的資料庫。例如:my-db
  • collection:MongoDB 資料庫中的集合名稱。例如:my-collection
  • deadletterTable:BigQuery 資料表,用於儲存因失敗而導致的訊息,例如結構定義不符、JSON 格式錯誤等。例如:your-project-id:your-dataset.your-table-name

選用參數

  • batchSize:用於將文件批次插入 MongoDB 的批次大小。預設值為 1000。
  • batchSizeBytes:批次大小 (以位元組為單位)。預設值為 5242880。
  • maxConnectionIdleTime:連線逾時前允許的最大閒置時間 (以秒為單位)。預設值為 60000。
  • sslEnabled:布林值,指出與 MongoDB 的連線是否已啟用 SSL。預設值為 true。
  • ignoreSSLCertificate:布林值,指出是否要忽略 SSL 憑證。預設值為 true。
  • withOrdered:布林值,可啟用依序大量插入 MongoDB 的功能。預設值為 true。
  • withSSLInvalidHostNameAllowed:布林值,指出是否允許 SSL 連線使用無效的主機名稱。預設值為 true。
  • javascriptTextTransformGcsPath:定義要使用的 JavaScript 使用者定義函式 (UDF) 的 .js 檔案 Cloud Storage URI。例如:gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 使用者定義函式 (UDF) 名稱。舉例來說,如果您的 JavaScript 函式程式碼是 myTransform(inJson) { /*...do stuff...*/ },則函式名稱就是 myTransform。如需 JavaScript UDF 範例,請參閱 UDF 範例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:指定重新載入 UDF 的頻率 (以分鐘為單位)。如果值大於 0,Dataflow 會定期檢查 Cloud Storage 中的 UDF 檔案,並在檔案經過修改時重新載入 UDF。您可以在管道執行期間更新 UDF,不必重新啟動工作。如果值為 0,系統會停用 UDF 重新載入功能。預設值為 0

使用者定義函式

您可以視需要撰寫使用者定義函式 (UDF) 來擴充這個範本。範本會針對每個輸入元素呼叫 UDF。元素酬載會序列化為 JSON 字串。詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

函式規格

UDF 的規格如下:

  • 輸入:輸入 CSV 檔案中的單行。
  • 輸出:要插入 MongoDB 的字串化 JSON 文件。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Pub/Sub to MongoDB template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • INPUT_SUBSCRIPTION:Pub/Sub 訂閱項目 (例如 projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI:MongoDB 伺服器位址 (例如 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE:MongoDB 資料庫的名稱 (例如 users)
  • COLLECTION:MongoDB 集合的名稱 (例如 profiles)
  • UNPROCESSED_TABLE:BigQuery 資料表的名稱 (例如 your-project:your-dataset.your-table-name)

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • INPUT_SUBSCRIPTION:Pub/Sub 訂閱項目 (例如 projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI:MongoDB 伺服器位址 (例如 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE:MongoDB 資料庫的名稱 (例如 users)
  • COLLECTION:MongoDB 集合的名稱 (例如 profiles)
  • UNPROCESSED_TABLE:BigQuery 資料表的名稱 (例如 your-project:your-dataset.your-table-name)

後續步驟