Streaming Data Generator to Pub/Sub, BigQuery, and Cloud Storage テンプレート

このストリーミング データ生成ツール テンプレートは、ユーザーが指定したスキーマに基づいて、指定されたレートで無制限または固定数の合成レコードまたはメッセージを生成するために使用されます。対応している宛先には、Pub/Sub トピック、BigQuery テーブル、Cloud Storage バケットがあります。

次のようなユースケースが考えられます。

  • Pub/Sub トピックへの大規模でリアルタイムのイベント公開をシミュレーションし、公開されたイベントを処理するために必要な受信者の数と規模を測定して判断します。
  • パフォーマンス ベンチマークを評価する、または概念実証として機能するには、BigQuery テーブルまたは Cloud Storage バケットに合成データを生成します。
次の表は、このテンプレートでサポートされるシンクおよびエンコード形式を示したものです。
JSON Avro Parquet
Pub/Sub ×
BigQuery いいえ ×
Cloud Storage はい

パイプラインの要件

  • ワーカー サービス アカウントを使用するには、Dataflow ワーカー(roles/dataflow.worker)にロールが割り当てられている必要があります。詳細については、IAM の概要をご覧ください。
  • 生成されたデータの JSON テンプレートを含むスキーマ ファイルを作成します。このテンプレートは JSON データ生成ツール ライブラリを使用しているため、スキーマの各フィールドにさまざまな faker 関数を指定できます。詳細については、json-data-generator ドキュメントをご覧ください。

    次に例を示します。

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • スキーマ ファイルを Cloud Storage バケットにアップロードします。
  • 実行する前に出力ターゲットが存在している必要があります。ターゲットは、シンクタイプに応じて、Pub/Sub トピック、BigQuery テーブル、Cloud Storage バケットのいずれかである必要があります。
  • 出力エンコードが Avro または Parquet の場合は、Avro スキーマ ファイルを作成し、Cloud Storage の場所に保存します。
  • 目的の宛先に応じて、ワーカー サービス アカウントに追加の IAM ロールを割り当てます。
    宛先 さらに必要な IAM ロール 適用先のリソース
    Pub/Sub Pub/Sub パブリッシャー(roles/pubsub.publisher
    (詳細については、IAM による Pub/Sub のアクセス制御をご覧ください)
    Pub/Sub トピック
    BigQuery BigQuery データ編集者(roles/bigquery.dataEditor
    (詳細については、IAM による BigQuery のアクセス制御をご覧ください)
    BigQuery データセット
    Cloud Storage Cloud Storage オブジェクト管理者(roles/storage.objectAdmin
    (詳細については、IAM による Cloud Storage のアクセス制御をご覧ください)
    Cloud Storage バケット

テンプレートのパラメータ

パラメータ 説明
schemaLocation スキーマ ファイルの場所。例: gs://mybucket/filename.json
qps 1 秒あたりにパブリッシュされるメッセージ数。例: 100
sinkType (省略可)出力シンクのタイプ。指定可能な値は PUBSUBBIGQUERYGCS です。デフォルトは PUBSUB です。
outputType (省略可)出力エンコード タイプ。指定可能な値は JSONAVROPARQUET です。デフォルトは JSON です。
avroSchemaLocation (省略可)AVRO スキーマ ファイルの場所。outputType が AVRO または PARQUET の場合は必須。例: gs://mybucket/filename.avsc
topic (省略可)パイプラインがデータを公開する Pub/Sub トピックの名前。sinkType が Pub/Sub の場合は必須。例: projects/my-project-id/topics/my-topic-id
outputTableSpec (省略可)出力 BigQuery テーブルの名前。sinkType が BigQuery の場合は必須。例: my-project-ID:my_dataset_name.my-table-name
writeDisposition (省略可)BigQuery の書き込み処理。指定可能な値は WRITE_APPENDWRITE_EMPTYWRITE_TRUNCATE です。デフォルトは WRITE_APPEND です。
outputDeadletterTable (省略可)失敗したレコードを格納する出力 BigQuery テーブルの名前。指定されていない場合、パイプラインは実行中に {output_table_name}_error_records という名前のテーブルを作成します。例: my-project-ID:my_dataset_name.my-table-name
outputDirectory (省略可)出力される Cloud Storage の場所のパス。sinkType が Cloud Storage の場合は必須。例: gs://mybucket/pathprefix/
outputFilenamePrefix (省略可)Cloud Storage に書き込まれる出力ファイルのファイル名の接頭辞。デフォルトは output- です。
windowDuration (省略可)出力が Cloud Storage に書き込まれる時間間隔。デフォルトは 1m(つまり 1 分)です。
numShards (省略可)出力シャードの最大数。sinkType が Cloud Storage の場合に必須で、1 以上の数値に設定する必要があります。
messagesLimit (省略可)出力メッセージの最大数。デフォルトは 0 で、制限がないことを示します。
autoscalingAlgorithm (省略可)ワーカーの自動スケーリングに使用されるアルゴリズム。使用できる値は、自動スケーリングを有効にする THROUGHPUT_BASED または無効にする NONE です。
maxNumWorkers (省略可)ワーカーマシンの最大数。例: 10

テンプレートを実行する

  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Streaming Data Generator template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • SCHEMA_LOCATION: Cloud Storage のスキーマ ファイルのパス。例: gs://mybucket/filename.json
  • QPS: 1 秒あたりにパブリッシュされるメッセージ数
  • PUBSUB_TOPIC: 出力 Pub/Sub トピック。例: projects/my-project-id/topics/my-topic-id

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクトの ID
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • JOB_NAME: 一意の任意のジョブ名
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • SCHEMA_LOCATION: Cloud Storage のスキーマ ファイルのパス。例: gs://mybucket/filename.json
  • QPS: 1 秒あたりにパブリッシュされるメッセージ数
  • PUBSUB_TOPIC: 出力 Pub/Sub トピック。例: projects/my-project-id/topics/my-topic-id
/*
 * Copyright (C) 2020 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

import com.github.vincentrussell.json.datagenerator.JsonDataGenerator;
import com.github.vincentrussell.json.datagenerator.JsonDataGeneratorException;
import com.github.vincentrussell.json.datagenerator.impl.JsonDataGeneratorImpl;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.StreamingDataGenerator.StreamingDataGeneratorOptions;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigQuery;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToGcs;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToJdbc;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToKafka;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToPubSub;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToSpanner;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.cloud.teleport.v2.utils.GCSUtils;
import com.google.cloud.teleport.v2.utils.MetadataValidator;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;

/**
 * The {@link StreamingDataGenerator} is a streaming pipeline which generates messages at a
 * specified rate to either Pub/Sub, BigQuery, GCS, JDBC, or Spanner. The messages are generated
 * according to a schema template which instructs the pipeline how to populate the messages with
 * fake data compliant to constraints.
 *
 * <p>The number of workers executing the pipeline must be large enough to support the supplied QPS.
 * Use a general rule of 2,500 QPS per core in the worker pool.
 *
 * <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
 * for instructions on how to construct the schema file.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/streaming-data-generator/README_Streaming_Data_Generator.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Streaming_Data_Generator",
    category = TemplateCategory.UTILITIES,
    displayName = "Streaming Data Generator",
    description =
        "A pipeline to publish messages at specified QPS.This template can be used to benchmark"
            + " performance of streaming pipelines.",
    optionsClass = StreamingDataGeneratorOptions.class,
    flexContainerName = "streaming-data-generator",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/streaming-data-generator",
    contactInformation = "https://cloud.google.com/support",
    streaming = true,
    supportsAtLeastOnce = true)
public class StreamingDataGenerator {

  /**
   * The {@link StreamingDataGeneratorOptions} class provides the custom execution options passed by
   * the executor at the command-line.
   */
  public interface StreamingDataGeneratorOptions extends PipelineOptions {
    @TemplateParameter.Long(
        order = 1,
        description = "Required output rate",
        helpText = "Indicates rate of messages per second to be published to Pub/Sub")
    @Required
    Long getQps();

    void setQps(Long value);

    @TemplateParameter.Enum(
        order = 2,
        enumOptions = {@TemplateEnumOption("GAME_EVENT")},
        optional = true,
        description = "Schema template to generate fake data",
        helpText = "Pre-existing schema template to use. The value must be one of: [GAME_EVENT]")
    SchemaTemplate getSchemaTemplate();

    void setSchemaTemplate(SchemaTemplate value);

    @TemplateParameter.GcsReadFile(
        order = 3,
        optional = true,
        description = "Location of Schema file to generate fake data",
        helpText = "Cloud Storage path of schema location.",
        example = "gs://<bucket-name>/prefix")
    String getSchemaLocation();

    void setSchemaLocation(String value);

    @TemplateParameter.PubsubTopic(
        order = 4,
        optional = true,
        description = "Output Pub/Sub topic",
        helpText = "The name of the topic to which the pipeline should publish data.",
        example = "projects/<project-id>/topics/<topic-name>")
    String getTopic();

    void setTopic(String value);

    @TemplateParameter.Long(
        order = 5,
        optional = true,
        description = "Maximum number of output Messages",
        helpText =
            "Indicates maximum number of output messages to be generated. 0 means unlimited.")
    @Default.Long(0L)
    Long getMessagesLimit();

    void setMessagesLimit(Long value);

    @TemplateParameter.Enum(
        order = 6,
        enumOptions = {
          @TemplateEnumOption("AVRO"),
          @TemplateEnumOption("JSON"),
          @TemplateEnumOption("PARQUET")
        },
        optional = true,
        description = "Output Encoding Type",
        helpText = "The message Output type. Default is JSON.")
    @Default.Enum("JSON")
    OutputType getOutputType();

    void setOutputType(OutputType value);

    @TemplateParameter.GcsReadFile(
        order = 7,
        optional = true,
        parentName = "outputType",
        parentTriggerValues = {"AVRO", "PARQUET"},
        description = "Location of Avro Schema file",
        helpText =
            "Cloud Storage path of Avro schema location. Mandatory when output type is AVRO or"
                + " PARQUET.",
        example = "gs://your-bucket/your-path/schema.avsc")
    String getAvroSchemaLocation();

    void setAvroSchemaLocation(String value);

    @TemplateParameter.Enum(
        order = 8,
        enumOptions = {
          @TemplateEnumOption("BIGQUERY"),
          @TemplateEnumOption("GCS"),
          @TemplateEnumOption("PUBSUB"),
          @TemplateEnumOption("JDBC"),
          @TemplateEnumOption("SPANNER"),
          @TemplateEnumOption("KAFKA")
        },
        optional = true,
        description = "Output Sink Type",
        helpText = "The message Sink type. Default is PUBSUB")
    @Default.Enum("PUBSUB")
    SinkType getSinkType();

    void setSinkType(SinkType value);

    @TemplateParameter.BigQueryTable(
        order = 9,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"BIGQUERY"},
        description = "Output BigQuery table",
        helpText = "Output BigQuery table. Mandatory when sinkType is BIGQUERY",
        example = "<project>:<dataset>.<table_name>")
    String getOutputTableSpec();

    void setOutputTableSpec(String value);

    @TemplateParameter.Enum(
        order = 10,
        enumOptions = {
          @TemplateEnumOption("WRITE_APPEND"),
          @TemplateEnumOption("WRITE_EMPTY"),
          @TemplateEnumOption("WRITE_TRUNCATE")
        },
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"BIGQUERY"},
        description = "Write Disposition to use for BigQuery",
        helpText =
            "BigQuery WriteDisposition. For example, WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE.")
    @Default.String("WRITE_APPEND")
    String getWriteDisposition();

    void setWriteDisposition(String writeDisposition);

    @TemplateParameter.BigQueryTable(
        order = 11,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"BIGQUERY"},
        description = "The dead-letter table name to output failed messages to BigQuery",
        helpText =
            "Messages failed to reach the output table for all kind of reasons (e.g., mismatched"
                + " schema, malformed json) are written to this table. If it doesn't exist, it will"
                + " be created during pipeline execution.",
        example = "your-project-id:your-dataset.your-table-name")
    String getOutputDeadletterTable();

    void setOutputDeadletterTable(String outputDeadletterTable);

    @TemplateParameter.Duration(
        order = 12,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"GCS"},
        description = "Window duration",
        helpText =
            "The window duration/size in which data will be written to Cloud Storage. Allowed"
                + " formats are: Ns (for seconds, example: 5s), Nm (for minutes, example: 12m), Nh"
                + " (for hours, example: 2h).",
        example = "1m")
    @Default.String("1m")
    String getWindowDuration();

    void setWindowDuration(String windowDuration);

    @TemplateParameter.GcsWriteFolder(
        order = 13,
        optional = true,
        description = "Output file directory in Cloud Storage",
        helpText =
            "The path and filename prefix for writing output files. Must end with a slash. DateTime"
                + " formatting is used to parse directory path for date & time formatters.",
        example = "gs://your-bucket/your-path/")
    String getOutputDirectory();

    void setOutputDirectory(String outputDirectory);

    @TemplateParameter.Text(
        order = 14,
        optional = true,
        description = "Output filename prefix of the files to write",
        helpText = "The prefix to place on each windowed file.",
        example = "output-")
    @Default.String("output-")
    String getOutputFilenamePrefix();

    void setOutputFilenamePrefix(String outputFilenamePrefix);

    @TemplateParameter.Integer(
        order = 15,
        optional = true,
        description = "Maximum output shards",
        helpText =
            "The maximum number of output shards produced when writing. A higher number of shards"
                + " means higher throughput for writing to Cloud Storage, but potentially higher"
                + " data aggregation cost across shards when processing output Cloud Storage files."
                + " Default value is decided by Dataflow.")
    @Default.Integer(0)
    Integer getNumShards();

    void setNumShards(Integer numShards);

    @TemplateParameter.Text(
        order = 16,
        optional = true,
        regexes = {"^.+$"},
        description = "JDBC driver class name.",
        helpText = "JDBC driver class name to use.",
        example = "com.mysql.jdbc.Driver")
    String getDriverClassName();

    void setDriverClassName(String driverClassName);

    @TemplateParameter.Text(
        order = 17,
        optional = true,
        regexes = {
          "(^jdbc:[a-zA-Z0-9/:@.?_+!*=&-;]+$)|(^([A-Za-z0-9+/]{4}){1,}([A-Za-z0-9+/]{0,3})={0,3})"
        },
        description = "JDBC connection URL string.",
        helpText = "Url connection string to connect to the JDBC source.",
        example = "jdbc:mysql://some-host:3306/sampledb")
    String getConnectionUrl();

    void setConnectionUrl(String connectionUrl);

    @TemplateParameter.Text(
        order = 18,
        optional = true,
        regexes = {"^.+$"},
        description = "JDBC connection username.",
        helpText = "User name to be used for the JDBC connection.")
    String getUsername();

    void setUsername(String username);

    @TemplateParameter.Password(
        order = 19,
        optional = true,
        description = "JDBC connection password.",
        helpText = "Password to be used for the JDBC connection.")
    String getPassword();

    void setPassword(String password);

    @TemplateParameter.Text(
        order = 20,
        optional = true,
        regexes = {"^[a-zA-Z0-9_;!*&=@#-:\\/]+$"},
        description = "JDBC connection property string.",
        helpText =
            "Properties string to use for the JDBC connection. Format of the string must be"
                + " [propertyName=property;]*.",
        example = "unicode=true;characterEncoding=UTF-8")
    String getConnectionProperties();

    void setConnectionProperties(String connectionProperties);

    @TemplateParameter.Text(
        order = 21,
        optional = true,
        regexes = {"^.+$"},
        description = "Statement which will be executed against the database.",
        helpText =
            "SQL statement which will be executed to write to the database. The statement must"
                + " specify the column names of the table in any order. Only the values of the"
                + " specified column names will be read from the json and added to the statement.",
        example = "INSERT INTO tableName (column1, column2) VALUES (?,?)")
    String getStatement();

    void setStatement(String statement);

    @TemplateParameter.ProjectId(
        order = 22,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        description = "GCP Project Id of where the Spanner table lives.",
        helpText = "GCP Project Id of where the Spanner table lives.")
    String getProjectId();

    void setProjectId(String projectId);

    @TemplateParameter.Text(
        order = 23,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        regexes = {"^.+$"},
        description = "Cloud Spanner instance name.",
        helpText = "Cloud Spanner instance name.")
    String getSpannerInstanceName();

    void setSpannerInstanceName(String spannerInstanceName);

    @TemplateParameter.Text(
        order = 24,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        regexes = {"^.+$"},
        description = "Cloud Spanner database name.",
        helpText = "Cloud Spanner database name.")
    String getSpannerDatabaseName();

    void setSpannerDatabaseName(String spannerDBName);

    @TemplateParameter.Text(
        order = 25,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        regexes = {"^.+$"},
        description = "Cloud Spanner table name.",
        helpText = "Cloud Spanner table name.")
    String getSpannerTableName();

    void setSpannerTableName(String spannerTableName);

    @TemplateParameter.Long(
        order = 26,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        description = "Max mutatated cells per batch.",
        helpText =
            "Specifies the cell mutation limit (maximum number of mutated cells per batch). Default value is 5000")
    Long getMaxNumMutations();

    void setMaxNumMutations(Long value);

    @TemplateParameter.Long(
        order = 27,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        description = "Max rows per batch.",
        helpText =
            "Specifies the row mutation limit (maximum number of mutated rows per batch). Default value is 1000")
    Long getMaxNumRows();

    void setMaxNumRows(Long value);

    @TemplateParameter.Long(
        order = 28,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        description = "Max batch size in bytes.",
        helpText =
            "Specifies the batch size limit (max number of bytes mutated per batch). Default value is 1MB")
    Long getBatchSizeBytes();

    void setBatchSizeBytes(Long value);

    @TemplateParameter.Long(
        order = 29,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"SPANNER"},
        description = "Commit deadline in seconds for write requests.",
        helpText = "Specifies the deadline in seconds for the Commit API call.")
    Long getCommitDeadlineSeconds();

    void setCommitDeadlineSeconds(Long value);

    @TemplateParameter.Text(
        order = 30,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"KAFKA"},
        regexes = {"[,:a-zA-Z0-9._-]+"},
        description = "Output Kafka Bootstrap Server",
        helpText = "Kafka Bootstrap Server ",
        example = "localhost:9092")
    String getBootstrapServer();

    void setBootstrapServer(String bootstrapServer);

    @TemplateParameter.Text(
        order = 31,
        optional = true,
        parentName = "sinkType",
        parentTriggerValues = {"KAFKA"},
        regexes = {"[a-zA-Z0-9._-]+"},
        description = "Kafka topic to write to",
        helpText = "Kafka topic to write to.",
        example = "topic")
    String getKafkaTopic();

    void setKafkaTopic(String outputTopic);
  }

  /** Allowed list of existing schema templates. */
  public enum SchemaTemplate {
    GAME_EVENT(
        "{\n"
            + "  \"eventId\": \"{{uuid()}}\",\n"
            + "  \"eventTimestamp\": {{timestamp()}},\n"
            + "  \"ipv4\": \"{{ipv4()}}\",\n"
            + "  \"ipv6\": \"{{ipv6()}}\",\n"
            + "  \"country\": \"{{country()}}\",\n"
            + "  \"username\": \"{{username()}}\",\n"
            + "  \"quest\": \"{{random(\"A Break In the Ice\", \"Ghosts of Perdition\", \"Survive"
            + " the Low Road\")}}\",\n"
            + "  \"score\": {{integer(100, 10000)}},\n"
            + "  \"completed\": {{bool()}}\n"
            + "}"),
    LOG_ENTRY(
        "{\n"
            + "  \"logName\": \"{{alpha(10,20)}}\",\n"
            + "  \"resource\": {\n"
            + "    \"type\": \"{{alpha(5,10)}}\"\n"
            + "  },\n"
            + "  \"timestamp\": {{timestamp()}},\n"
            + "  \"receiveTimestamp\": {{timestamp()}},\n"
            + "  \"severity\": \"{{random(\"DEFAULT\", \"DEBUG\", \"INFO\", \"NOTICE\","
            + " \"WARNING\", \"ERROR\", \"CRITICAL\", \"ERROR\")}}\",\n"
            + "  \"insertId\": \"{{uuid()}}\",\n"
            + "  \"trace\": \"{{uuid()}}\",\n"
            + "  \"spanId\": \"{{uuid()}}\",\n"
            + "  \"jsonPayload\": {\n"
            + "    \"bytes_sent\": {{integer(1000,20000)}},\n"
            + "    \"connection\": {\n"
            + "      \"dest_ip\": \"{{ipv4()}}\",\n"
            + "      \"dest_port\": {{integer(0,65000)}},\n"
            + "      \"protocol\": {{integer(0,6)}},\n"
            + "      \"src_ip\": \"{{ipv4()}}\",\n"
            + "      \"src_port\": {{integer(0,65000)}}\n"
            + "    },\n"
            + "    \"dest_instance\": {\n"
            + "      \"project_id\": \"{{concat(\"PROJECT\", integer(0,3))}}\",\n"
            + "      \"region\": \"{{country()}}\",\n"
            + "      \"vm_name\": \"{{username()}}\",\n"
            + "      \"zone\": \"{{state()}}\"\n"
            + "    },\n"
            + "    \"end_time\": {{timestamp()}},\n"
            + "    \"packets_sent\": {{integer(100,400)}},\n"
            + "    \"reporter\": \"{{random(\"SRC\", \"DEST\")}}\",\n"
            + "    \"rtt_msec\": {{integer(0,20)}},\n"
            + "    \"start_time\": {{timestamp()}}\n"
            + "  }\n"
            + "}");

    private final String schema;

    SchemaTemplate(String schema) {
      this.schema = schema;
    }

    public String getSchema() {
      return schema;
    }
  }

  /** Allowed list of message encoding types. */
  public enum OutputType {
    JSON(".json"),
    AVRO(".avro"),
    PARQUET(".parquet");

    private final String fileExtension;

    /** Sets file extension associated with output type. */
    OutputType(String fileExtension) {
      this.fileExtension = fileExtension;
    }

    /** Returns file extension associated with output type. */
    public String getFileExtension() {
      return fileExtension;
    }
  }

  /** Allowed list of sink types. */
  public enum SinkType {
    PUBSUB,
    BIGQUERY,
    GCS,
    JDBC,
    SPANNER,
    KAFKA
  }

  /**
   * The main entry-point for pipeline execution. This method will start the pipeline but will not
   * wait for it's execution to finish. If blocking execution is required, use the {@link
   * StreamingDataGenerator#run(StreamingDataGeneratorOptions)} method to start the pipeline and
   * invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args command-line args passed by the executor.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    StreamingDataGeneratorOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(StreamingDataGeneratorOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options the execution options.
   * @return the pipeline result.
   */
  public static PipelineResult run(@Nonnull StreamingDataGeneratorOptions options) {
    checkNotNull(options, "options argument to run method cannot be null.");
    MetadataValidator.validate(options);

    // FileSystems does not set the default configuration in workers till Pipeline.run
    // Explicitly registering standard file systems.
    FileSystems.setDefaultPipelineOptions(options);
    String schema = getSchema(options.getSchemaTemplate(), options.getSchemaLocation());

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps:
     *  1) Trigger at the supplied QPS
     *  2) Generate messages containing fake data
     *  3) Write messages to appropriate Sink
     */
    PCollection<byte[]> generatedMessages =
        pipeline
            .apply("Trigger", createTrigger(options))
            .apply("Generate Fake Messages", ParDo.of(new MessageGeneratorFn(schema)));

    if (options.getSinkType().equals(SinkType.GCS)) {
      generatedMessages =
          generatedMessages.apply(
              options.getWindowDuration() + " Window",
              Window.into(
                  FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))));
    }

    generatedMessages.apply(
        "Write To " + options.getSinkType().name(), createSink(options, schema));

    return pipeline.run();
  }

  /**
   * Creates either Bounded or UnBounded Source based on messageLimit pipeline option.
   *
   * @param options the pipeline options.
   */
  private static GenerateSequence createTrigger(@Nonnull StreamingDataGeneratorOptions options) {
    checkNotNull(options, "options argument to createTrigger method cannot be null.");
    GenerateSequence generateSequence =
        GenerateSequence.from(0L)
            .withRate(options.getQps(), /* periodLength= */ Duration.standardSeconds(1L));

    return options.getMessagesLimit() > 0
        ? generateSequence.to(options.getMessagesLimit())
        : generateSequence;
  }

  /**
   * The {@link MessageGeneratorFn} class generates fake messages based on supplied schema
   *
   * <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
   * for instructions on how to construct the schema file.
   */
  @VisibleForTesting
  static class MessageGeneratorFn extends DoFn<Long, byte[]> {

    // Not initialized inline or constructor because {@link JsonDataGenerator} is not serializable.
    private transient JsonDataGenerator dataGenerator;
    private final String schema;

    MessageGeneratorFn(String schema) {
      this.schema = schema;
    }

    @Setup
    public void setup() {
      dataGenerator = new JsonDataGeneratorImpl();
    }

    @ProcessElement
    public void processElement(
        @Element Long element,
        @Timestamp Instant timestamp,
        OutputReceiver<byte[]> receiver,
        ProcessContext context)
        throws IOException, JsonDataGeneratorException {

      byte[] payload;

      // Generate the fake JSON according to the schema.
      try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
        dataGenerator.generateTestDataJson(schema, byteArrayOutputStream);
        payload = byteArrayOutputStream.toByteArray();
      }

      receiver.output(payload);
    }
  }

  /**
   * Creates appropriate sink based on sinkType pipeline option.
   *
   * @param options the pipeline options.
   */
  @VisibleForTesting
  static PTransform<PCollection<byte[]>, PDone> createSink(
      @Nonnull StreamingDataGeneratorOptions options, @Nonnull String schema) {
    checkNotNull(options, "options argument to createSink method cannot be null.");
    checkNotNull(schema, "schema argument to createSink method cannot be null.");

    switch (options.getSinkType()) {
      case PUBSUB:
        checkArgument(
            options.getTopic() != null,
            String.format(
                "Missing required value --topic for %s sink type", options.getSinkType().name()));
        return StreamingDataGeneratorWriteToPubSub.Writer.builder(options, schema).build();
      case BIGQUERY:
        checkArgument(
            options.getOutputTableSpec() != null,
            String.format(
                "Missing required value --outputTableSpec in format"
                    + " <project>:<dataset>.<table_name> for %s sink type",
                options.getSinkType().name()));
        return StreamingDataGeneratorWriteToBigQuery.builder(options).build();
      case GCS:
        checkArgument(
            options.getOutputDirectory() != null,
            String.format(
                "Missing required value --outputDirectory in format gs:// for %s sink type",
                options.getSinkType().name()));
        return StreamingDataGeneratorWriteToGcs.builder(options).build();
      case JDBC:
        checkArgument(
            options.getDriverClassName() != null,
            String.format(
                "Missing required value --driverClassName for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getConnectionUrl() != null,
            String.format(
                "Missing required value --connectionUrl for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getStatement() != null,
            String.format(
                "Missing required value --statement for %s sink type",
                options.getSinkType().name()));
        return StreamingDataGeneratorWriteToJdbc.builder(options).build();
      case SPANNER:
        checkArgument(
            options.getProjectId() != null,
            String.format(
                "Missing required value --projectId for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getSpannerInstanceName() != null,
            String.format(
                "Missing required value --spannerInstanceName for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getSpannerDatabaseName() != null,
            String.format(
                "Missing required value --spannerDatabaseName for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getSpannerTableName() != null,
            String.format(
                "Missing required value --spannerTableName for %s sink type",
                options.getSinkType().name()));
        return StreamingDataGeneratorWriteToSpanner.builder(options).build();
      case KAFKA:
        checkArgument(
            options.getBootstrapServer() != null,
            String.format(
                "Missing required value --bootstrapServer for %s sink type",
                options.getSinkType().name()));
        checkArgument(
            options.getKafkaTopic() != null,
            String.format(
                "Missing required value --kafkaTopic for %s sink type",
                options.getSinkType().name()));
        return StreamingDataGeneratorWriteToKafka.Writer.builder(options).build();
      default:
        throw new IllegalArgumentException("Unsupported Sink.");
    }
  }

  private static String getSchema(SchemaTemplate schemaTemplate, String schemaLocation) {
    checkArgument(
        schemaTemplate != null || schemaLocation != null,
        "Either schemaTemplate or schemaLocation argument of MessageGeneratorFn class must be"
            + " provided.");
    if (schemaLocation != null) {
      return GCSUtils.getGcsFileAsString(schemaLocation);
    } else {
      return schemaTemplate.getSchema();
    }
  }
}

次のステップ