MongoDB to BigQuery テンプレート(Stream)

このテンプレートは、MongoDB 変更ストリームと連携するストリーミング パイプラインを作成します。このテンプレートを使用するには、変更ストリーム データを Pub/Sub に公開します。パイプラインは Pub/Sub から JSON レコードを読み取り、BigQuery に書き込みます。BigQuery に書き込まれるレコードは、MongoDB to BigQuery バッチ テンプレートと同じ形式になります。

パイプラインの要件

  • ターゲット BigQuery データセットが存在すること。
  • ソース MongoDB インスタンスに Dataflow ワーカーマシンからアクセスできること。
  • 変更ストリームを読み取るには、Pub/Sub トピックを作成する必要があります。パイプラインの実行中に、MongoDB 変更ストリームで変更データ キャプチャ(CDC)イベントをリッスンし、それらを JSON レコードとして Pub/Sub に公開します。Pub/Sub へのメッセージのパブリッシュの詳細については、メッセージをトピックに公開するをご覧ください。
  • このテンプレートでは MongoDB 変更ストリームを使用します。BigQuery の変更データ キャプチャはサポートされていません。

テンプレートのパラメータ

必須パラメータ

  • mongoDbUri: MongoDB 接続 URI。形式は mongodb+srv://:@.
  • database: コレクションを読み取る MongoDB 内のデータベース(例: my-db)。
  • collection: MongoDB データベース内のコレクションの名前(例: my-collection)。
  • userOption : FLATTENJSONNONEFLATTEN は、ドキュメントを単一レベルにフラット化します。JSON は、ドキュメントを BigQuery JSON 形式で保存します。NONE は、ドキュメント全体を JSON 形式の STRING として保存します。デフォルトは NONE です。
  • inputTopic: 読み取る Pub/Sub 入力トピック。projects/<PROJECT_ID>/topics/<TOPIC_NAME> の形式で指定します。
  • outputTableSpec: 書き込み先の BigQuery テーブル。たとえば、bigquery-project:dataset.output_table のようにします。

オプション パラメータ

  • useStorageWriteApiAtLeastOnce: Storage Write API を使用する場合は、書き込みセマンティクスを指定します。at-least-once セマンティクス(https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics)を使用するには、このパラメータを true に設定します。exactly-once セマンティクスを使用するには、パラメータを false に設定します。このパラメータは、useStorageWriteApitrue の場合にのみ適用されます。デフォルト値は false です。
  • KMSEncryptionKey: MongoDB URI 接続文字列を復号するための Cloud KMS 暗号鍵。Cloud KMS 鍵が渡された場合、MongoDB URI 接続文字列はすべて暗号化されて渡されます(例: projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key)。
  • filter : JSON 形式の Bson フィルタ。(例: { "val": { $gt: 0, $lt: 9 }})。
  • useStorageWriteApi: true の場合、パイプラインは BigQuery Storage Write API(https://cloud.google.com/bigquery/docs/write-api)を使用します。デフォルト値は false です。詳細については、Storage Write API の使用(https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api)をご覧ください。
  • numStorageWriteApiStreams: Storage Write API を使用する場合は、書き込みストリームの数を指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。デフォルト値は 0 です。
  • storageWriteApiTriggeringFrequencySec: Storage Write API を使用する場合は、トリガーの頻度を秒単位で指定します。useStorageWriteApitrue であり、useStorageWriteApiAtLeastOncefalse の場合に、このパラメータを設定する必要があります。
  • bigQuerySchemaPath: BigQuery JSON スキーマの Cloud Storage パス。(例: gs://your-bucket/your-schema.json)。
  • javascriptDocumentTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI(例: gs://your-bucket/your-transforms/*.js)。
  • javascriptDocumentTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数が myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください(例: transform)。

ユーザー定義関数

必要であれば、JavaScript でユーザー定義関数(UDF)を記述して、このテンプレートを拡張できます。このテンプレートは入力要素ごとに UDF を呼び出します。要素のペイロードは、JSON 文字列としてシリアル化されます。

UDF を使用するには、JavaScript ファイルを Cloud Storage にアップロードし、次のテンプレート パラメータを設定します。

パラメータ説明
javascriptDocumentTransformGcsPath JavaScript ファイルの Cloud Storage の場所。
javascriptDocumentTransformFunctionName JavaScript 関数の名前。

詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

関数の仕様

UDF の仕様は次のとおりです。

  • 入力: MongoDB ドキュメント。
  • 出力: JSON 文字列としてシリアル化されたオブジェクト。
  • テンプレートを実行する

    コンソール

    1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
    2. [テンプレートからジョブを作成] に移動
    3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
    4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

      Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

    5. [Dataflow テンプレート] プルダウン メニューから、[ the MongoDB (CDC) to BigQuery template] を選択します。
    6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
    7. [ジョブを実行] をクリックします。

    gcloud

    シェルまたはターミナルで、テンプレートを実行します。

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery_CDC \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION,\
    inputTopic=INPUT_TOPIC

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
    • MONGO_DB_URI: MongoDB URI。
    • DATABASE: MongoDB データベース。
    • COLLECTION: MongoDB コレクション。
    • USER_OPTION: FLATTEN、JSON、NONE。
    • INPUT_TOPIC: Pub/Sub 入力トピック。

    API

    REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION",
              "inputTopic": "INPUT_TOPIC"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery_CDC",
       }
    }

    次のように置き換えます。

    • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
    • JOB_NAME: 一意の任意のジョブ名
    • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
    • VERSION: 使用するテンプレートのバージョン

      使用できる値は次のとおりです。

      • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
      • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
    • OUTPUT_TABLE_SPEC: ターゲット BigQuery テーブル名。
    • MONGO_DB_URI: MongoDB URI。
    • DATABASE: MongoDB データベース。
    • COLLECTION: MongoDB コレクション。
    • USER_OPTION: FLATTEN、JSON、NONE。
    • INPUT_TOPIC: Pub/Sub 入力トピック。

    次のステップ