Pub/Sub to MongoDB with Python UDFs 範本

「Pub/Sub 到 MongoDB (使用 Python UDF)」範本是一個串流管道,可從 Pub/Sub 訂閱項目讀取 JSON 編碼的訊息,並以文件形式寫入 MongoDB。如有需要,這個管道支援其他轉換作業,您可以使用 Python 使用者定義函式 (UDF) 納入這些轉換。

如果處理記錄時發生錯誤,範本會將錯誤連同輸入訊息一併寫入 BigQuery 資料表。舉例來說,如果結構定義不相符、JSON 格式不正確,或是在執行轉換作業時發生錯誤,在 deadletterTable 參數中指定資料表名稱。如果資料表不存在,管道會自動建立。

管道相關規定

  • Pub/Sub 訂閱項目必須存在,且訊息必須以有效的 JSON 格式編碼。
  • MongoDB 叢集必須存在,且應能透過 Dataflow 工作站機器存取。

範本參數

參數 說明
inputSubscription Pub/Sub 訂閱項目名稱。例如:projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri 以半形逗號分隔的 MongoDB 伺服器清單。例如:192.285.234.12:27017,192.287.123.11:27017
database MongoDB 中的資料庫,用於儲存集合。例如 my-db
collection MongoDB 資料庫中的集合名稱。例如 my-collection
deadletterTable BigQuery 資料表,用於儲存因失敗 (結構定義不相符、JSON 格式錯誤等) 而無法傳送的訊息。例如 project-id:dataset-name.table-name
pythonExternalTextTransformGcsPath 選用: 定義要使用的使用者定義函式 (UDF) 的 Python 程式碼檔案 Cloud Storage URI。例如:gs://my-bucket/my-udfs/my_file.py
pythonExternalTextTransformFunctionName 選用: 要使用的 Python 使用者定義函式 (UDF) 名稱。
batchSize 選用:將文件批次插入 MongoDB 時使用的批次大小。預設值:1000
batchSizeBytes 選用:批次大小 (以位元組為單位)。預設值:5242880
maxConnectionIdleTime 選用:連線逾時前允許的最大閒置時間 (以秒為單位)。預設值:60000
sslEnabled 選用:布林值,指出是否啟用與 MongoDB 的 SSL 連線。預設值:true
ignoreSSLCertificate 選用:表示是否應忽略 SSL 憑證的布林值。預設值:true
withOrdered 選用:布林值,可啟用依序大量插入 MongoDB 的作業。預設值:true
withSSLInvalidHostNameAllowed 選用:布林值,指出是否允許無效的主機名稱用於 SSL 連線。預設值:true

使用者定義函式

您可以視需要撰寫使用者定義函式 (UDF) 來擴充這個範本。範本會針對每個輸入元素呼叫 UDF。元素酬載會序列化為 JSON 字串。詳情請參閱「為 Dataflow 範本建立使用者定義函式」。

函式規格

UDF 的規格如下:

  • 輸入:輸入 CSV 檔案中的單行。
  • 輸出:要插入 MongoDB 的字串化 JSON 文件。

執行範本

控制台

  1. 前往 Dataflow 的「Create job from template」(透過範本建立工作) 頁面。
  2. 前往「依據範本建立工作」
  3. 在「工作名稱」欄位中,輸入專屬工作名稱。
  4. 選用:如要使用區域端點,請從下拉式選單中選取值。預設區域為 us-central1

    如需可執行 Dataflow 工作的地區清單,請參閱「Dataflow 位置」。

  5. 從「Dataflow template」(Dataflow 範本) 下拉式選單中選取 the Pub/Sub to MongoDB with Python UDFs template。
  6. 在提供的參數欄位中輸入參數值。
  7. 按一下「Run Job」(執行工作)

gcloud

在殼層或終端機中執行範本:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • REGION_NAME: 您要部署 Dataflow 工作的地區,例如 us-central1
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • INPUT_SUBSCRIPTION:Pub/Sub 訂閱項目 (例如 projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI:MongoDB 伺服器位址 (例如 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE:MongoDB 資料庫的名稱 (例如 users)
  • COLLECTION:MongoDB 集合的名稱 (例如 profiles)
  • UNPROCESSED_TABLE:BigQuery 資料表的名稱 (例如 your-project:your-dataset.your-table-name)

API

如要使用 REST API 執行範本,請傳送 HTTP POST 要求。如要進一步瞭解 API 和授權範圍,請參閱 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang",
   }
}
  

更改下列內容:

  • PROJECT_ID: 您要執行 Dataflow 工作的專案 ID Google Cloud
  • LOCATION: 您要部署 Dataflow 工作的地區,例如 us-central1
  • JOB_NAME: 您選擇的不重複工作名稱
  • VERSION: 您要使用的範本版本

    您可以使用下列值:

  • INPUT_SUBSCRIPTION:Pub/Sub 訂閱項目 (例如 projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI:MongoDB 伺服器位址 (例如 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE:MongoDB 資料庫的名稱 (例如 users)
  • COLLECTION:MongoDB 集合的名稱 (例如 profiles)
  • UNPROCESSED_TABLE:BigQuery 資料表的名稱 (例如 your-project:your-dataset.your-table-name)

後續步驟