BigQuery to Elasticsearch テンプレート

BigQuery to Elasticsearch テンプレートは、BigQuery テーブルから Elasticsearch にデータをドキュメントとして取り込むバッチ パイプラインです。テンプレートでは、テーブル全体を読み取ることも、クエリを使用して特定のレコードを読み取ることもできます。

  • ソース BigQuery テーブルが存在すること。
  • Google Cloud インスタンス上または Elasticsearch バージョン 7.0 以降の Elastic Cloud 上の Elasticsearch ホスト。Dataflow ワーカーマシンからアクセス可能であること。

テンプレートのパラメータ

必須パラメータ

  • connectionUrl: Elasticsearch URL(https://hostname:[port] 形式)。Elastic Cloud を使用している場合は、CloudID を指定します例: https://elasticsearch-host:9200
  • apiKey: 認証に使用する Base64 でエンコードされた API キー。
  • index: リクエストが発行される Elasticsearch インデックス。例: my-index

オプション パラメータ

  • inputTableSpec: 読み取り元の BigQuery テーブル。inputTableSpec を指定すると、テンプレートは BigQuery Storage Read API(https://cloud.google.com/bigquery/docs/reference/storage)を使用して、BigQuery ストレージから直接データを読み取ります。Storage Read API の制限については、https://cloud.google.com/bigquery/docs/reference/storage#limitations をご覧ください。inputTableSpec または query を指定する必要があります。両方のパラメータを設定した場合、テンプレートは query パラメータを使用します。(例: <BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>)。
  • outputDeadletterTable: 出力テーブルに到達できなかったメッセージの BigQuery テーブル。テーブルが存在しない場合は、パイプラインの実行中に作成されます。指定しない場合は <outputTableSpec>_error_records が使用されます例: <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>
  • query: BigQuery からデータを読み取るために使用する SQL クエリ。BigQuery データセットが Dataflow ジョブとは異なるプロジェクトにある場合は、SQL クエリで完全なデータセット名を指定します(例: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>)。デフォルトでは、useLegacySqltrue でない限り、query パラメータは GoogleSQL(https://cloud.google.com/bigquery/docs/introduction-sql)を使用します。inputTableSpec または query を指定する必要があります。両方のパラメータを設定した場合、テンプレートは query パラメータを使用します例: select * from sampledb.sample_table
  • useLegacySql: レガシー SQL を使用するには true に設定します。このパラメータは、query パラメータを使用する場合にのみ適用されます。デフォルトは false です。
  • queryLocation: 基となるテーブルの権限なしで承認済みビューから読み取る場合は必須です。例: US
  • elasticsearchUsername: 認証に使用する Elasticsearch のユーザー名。指定すると、apiKey の値は無視されます。
  • elasticsearchPassword: 認証に使用する Elasticsearch のパスワード。指定すると、apiKey の値は無視されます。
  • batchSize: バッチサイズ(ドキュメント数)。デフォルトは 1000 です。
  • batchSizeBytes: バッチサイズ(バイト数)。デフォルトは 5242880(5 MB)です。
  • maxRetryAttempts: 再試行の最大回数。0 より大きい値にする必要があります。デフォルトは no retries です。
  • maxRetryDuration: 最大再試行時間(ミリ秒)。0 より大きい値にする必要があります。デフォルトは no retries です。
  • propertyAsIndex: インデックスに登録されているドキュメント内のプロパティ。このプロパティの値は、一括リクエストでドキュメントに含まれる _index メタデータを指定します。_index UDF よりも優先されます。デフォルトは none です。
  • javaScriptIndexFnGcsPath: 一括リクエストでドキュメントに含まれる _index メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトは none です。
  • javaScriptIndexFnName: 一括リクエストでドキュメントに含める _index メタデータを指定する UDF JavaScript 関数の名前。デフォルトは none です。
  • propertyAsId: インデックスに登録されているドキュメント内のプロパティ。このプロパティの値は、一括リクエストでドキュメントに含まれる _id メタデータを指定します。_id UDF よりも優先されます。デフォルトは none です。
  • javaScriptIdFnGcsPath: 一括リクエストでドキュメントに含まれる _id メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトは none です。
  • javaScriptIdFnName: 一括リクエストでドキュメントに含める _id メタデータを指定する UDF JavaScript 関数の名前。デフォルトは none です。
  • javaScriptTypeFnGcsPath: 一括リクエストでドキュメントに含まれる _type メタデータを指定する関数の JavaScript UDF ソースへの Cloud Storage パス。デフォルトは none です。
  • javaScriptTypeFnName: 一括リクエストでドキュメントに含まれる _type メタデータを指定する UDF JavaScript 関数の名前。デフォルトは none です。
  • javaScriptIsDeleteFnGcsPath: ドキュメントを挿入や更新ではなく削除するかどうかを決定する関数の JavaScript UDF ソースへの Cloud Storage パス。この関数は、文字列値 true または false を返します。デフォルトは none です。
  • javaScriptIsDeleteFnName: ドキュメントを挿入または更新する代わりに削除するかどうかを決定する UDF JavaScript 関数の名前。この関数は、文字列値 true または false を返します。デフォルトは none です。
  • usePartialUpdate: Elasticsearch リクエストで部分的な更新(作成やインデックス作成ではなく更新、部分的なドキュメントを許可する)を使用するかどうか。デフォルトは false です。
  • bulkInsertMethod: INDEX(インデックス、upserts を許可する)または CREATE(作成、duplicate _id でエラー)を Elasticsearch 一括リクエストで使用するかどうか。デフォルトは CREATE です。
  • trustSelfSignedCerts: 自己署名証明書を信頼するかどうか。インストールされた Elasticsearch インスタンスに自己署名証明書が存在する場合があります。SSL 証明書の検証をバイパスするには、この値を true に設定します。(デフォルト: false)。
  • disableCertificateValidation: true の場合、自己署名 SSL 証明書を信頼します。Elasticsearch インスタンスには自己署名証明書が存在する場合があります。証明書の検証をバイパスするには、このパラメータを true に設定します。デフォルトは false です。
  • apiKeyKMSEncryptionKey: API キーを復号するための Cloud KMS 鍵。apiKeySourceKMS に設定されている場合、このパラメータは必須です。このパラメータを指定する場合は、暗号化された apiKey 文字列を渡します。KMS API 暗号化エンドポイントを使用してパラメータを暗号化します。キーには projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME> の形式を使用します。https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt をご覧ください(例: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name)。
  • apiKeySecretId: apiKey の Secret Manager シークレット ID。apiKeySourceSECRET_MANAGER に設定されている場合は、このパラメータを指定します。projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version` の形式を使用します。
  • apiKeySource: API キーのソース。使用できる値は PLAINTEXTKMSSECRET_MANAGER です。Secret Manager または KMS を使用する場合は、このパラメータが必要です。apiKeySourceKMS に設定されている場合は、apiKeyKMSEncryptionKey と暗号化された apiKey を指定する必要があります。apiKeySourceSECRET_MANAGER に設定されている場合は、apiKeySecretId を指定する必要があります。apiKeySourcePLAINTEXT に設定されている場合は、apiKey を指定する必要があります。デフォルト: PLAINTEXT。
  • socketTimeout: 設定すると、Elastic RestClient のデフォルトの最大再試行タイムアウトとデフォルトのソケット タイムアウト(30000ms)が上書きされます。
  • javascriptTextTransformGcsPath: 使用する JavaScript ユーザー定義関数(UDF)を定義する .js ファイルの Cloud Storage URI例: gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName: 使用する JavaScript ユーザー定義関数(UDF)の名前。たとえば、JavaScript 関数コードが myTransform(inJson) { /*...do stuff...*/ } の場合、関数名は myTransform です。JavaScript UDF の例については、UDF の例(https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)をご覧ください。

ユーザー定義の関数

次のように、このテンプレートでは、パイプライン内の複数のポイントでユーザー定義関数(UDF)をサポートしています。詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

インデックス関数

ドキュメントが属するインデックスを返します。

テンプレートのパラメータ:

  • javaScriptIndexFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIndexFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _index メタデータ フィールドの値。

ドキュメント ID 関数

ドキュメント ID を返します。

テンプレートのパラメータ:

  • javaScriptIdFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIdFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _id メタデータ フィールドの値。

ドキュメント削除関数

ドキュメントを削除するかどうかを指定します。この関数を使用するには、一括挿入モードを INDEX に設定し、ドキュメント ID 関数を指定します。

テンプレートのパラメータ:

  • javaScriptIsDeleteFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptIsDeleteFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントを削除する場合は文字列 "true" を、ドキュメントをアップサートする場合は "false" を返します。

マッピング タイプ関数

ドキュメントのマッピング型を返します。

テンプレートのパラメータ:

  • javaScriptTypeFnGcsPath: JavaScript ファイルの Cloud Storage URI。
  • javaScriptTypeFnName: JavaScript 関数の名前。

関数の仕様:

  • 入力: JSON 文字列としてシリアル化された Elasticsearch ドキュメント。
  • 出力: ドキュメントの _type メタデータ フィールドの値。

テンプレートを実行する

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the BigQuery to Elasticsearch template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \
    --parameters \
inputTableSpec=INPUT_TABLE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • INPUT_TABLE_SPEC: BigQuery テーブル名
  • CONNECTION_URL: Elasticsearch の URL
  • APIKEY: 認証用に Base64 でエンコードされた API キー
  • INDEX: Elasticsearch インデックス。

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch",
   }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • INPUT_TABLE_SPEC: BigQuery テーブル名
  • CONNECTION_URL: Elasticsearch の URL
  • APIKEY: 認証用に Base64 でエンコードされた API キー
  • INDEX: Elasticsearch インデックス。
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.elasticsearch.templates;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.elasticsearch.options.BigQueryToElasticsearchOptions;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.ReadBigQueryTableRows;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.TableRowToJsonFn;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.TransformTextViaJavascript;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;

/**
 * The {@link BigQueryToElasticsearch} pipeline exports data from a BigQuery table to Elasticsearch.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/README_BigQuery_to_Elasticsearch.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "BigQuery_to_Elasticsearch",
      category = TemplateCategory.BATCH,
      displayName = "BigQuery to Elasticsearch",
      description =
          "The BigQuery to Elasticsearch template is a batch pipeline that ingests data from a BigQuery table into Elasticsearch as documents. "
              + "The template can either read the entire table or read specific records using a supplied query.",
      optionsClass = BigQueryToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformReloadIntervalMinutes",
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName"
      },
      flexContainerName = "bigquery-to-elasticsearch",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The source BigQuery table must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or above and should be accessible from the Dataflow worker machines.",
      }),
  @Template(
      name = "BigQuery_to_Elasticsearch_Xlang",
      category = TemplateCategory.BATCH,
      displayName = "BigQuery to Elasticsearch with Python UDFs",
      type = Template.TemplateType.XLANG,
      description =
          "The BigQuery to Elasticsearch template is a batch pipeline that ingests data from a BigQuery table into Elasticsearch as documents. "
              + "The template can either read the entire table or read specific records using a supplied query.",
      optionsClass = BigQueryToElasticsearchOptions.class,
      skipOptions = {
        "javascriptTextTransformReloadIntervalMinutes",
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName"
      },
      flexContainerName = "bigquery-to-elasticsearch-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The source BigQuery table must exist.",
        "A Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or above and should be accessible from the Dataflow worker machines.",
      })
})
public class BigQueryToElasticsearch {
  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    BigQueryToElasticsearchOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(BigQueryToElasticsearchOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  private static PipelineResult run(BigQueryToElasticsearchOptions options) {

    // Create the pipeline.
    Pipeline pipeline = Pipeline.create(options);
    /*
     * Steps: 1) Read records from BigQuery via BigQueryIO.
     *        2) Create json string from Table Row.
     *        3) Write records to Elasticsearch.
     */

    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    if (useJavascriptUdf && usePythonUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");
    }

    /*
     * Step #1: Read from BigQuery. If a query is provided then it is used to get the TableRows.
     */
    PCollection<String> readJsonDocuments =
        pipeline
            .apply(
                "ReadFromBigQuery",
                ReadBigQueryTableRows.newBuilder()
                    .setOptions(options.as(BigQueryToElasticsearchOptions.class))
                    .build())

            /*
             * Step #2: Convert table rows to JSON documents.
             */
            .apply("TableRowsToJsonDocument", ParDo.of(new TableRowToJsonFn()));

    /*
     * Step #3: Apply UDF functions (if specified)
     */
    PCollection<String> udfOut;
    if (usePythonUdf) {
      udfOut =
          readJsonDocuments
              .apply(
                  "MapToRecord",
                  PythonExternalTextTransformer.FailsafeRowPythonExternalUdf
                      .stringMappingFunction())
              .setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)
              .apply(
                  "InvokeUDF",
                  PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                      .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
                      .setFunctionName(options.getPythonExternalTextTransformFunctionName())
                      .build())
              .apply(
                  "MapToStringElements",
                  ParDo.of(new PythonExternalTextTransformer.RowToStringElementFn()));
    } else {
      udfOut =
          readJsonDocuments.apply(
              TransformTextViaJavascript.newBuilder()
                  .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                  .setFunctionName(options.getJavascriptTextTransformFunctionName())
                  .build());
    }

    /*
     * Step #4: Write converted records to Elasticsearch
     */
    udfOut.apply(
        "WriteToElasticsearch",
        WriteToElasticsearch.newBuilder()
            .setUserAgent("dataflow-bigquery-to-elasticsearch-template/v2")
            .setOptions(options.as(BigQueryToElasticsearchOptions.class))
            .build());

    return pipeline.run();
  }
}

次のステップ