Dataflow から BigQuery に書き込む

このドキュメントでは、Apache Beam BigQuery I/O コネクタを使用して Dataflow から BigQuery にデータを書き込む方法について説明します。

BigQuery I/O コネクタは Apache Beam SDK で使用できます。最新バージョンの SDK を使用することをおすすめします。詳細については、Apache Beam 2.x SDK をご覧ください。

Python の言語間変換サポートも利用できます。

概要

BigQuery I/O コネクタでは、BigQuery への書き込みで次のメソッドがサポートされています。

  • STORAGE_WRITE_API。このモードでは、コネクタは BigQuery Storage Write API を使用して BigQuery ストレージに直接書き込みを行います。Storage Write API は、ストリーミングの取り込みとバッチ読み込みを 1 つの高性能 API にまとめたものです。このモードでは exactly-once セマンティクスが保証されます。
  • STORAGE_API_AT_LEAST_ONCE。このモードでは Storage Write API も使用しますが、at-least-once セマンティクスが提供されます。このモードを使用すると、ほとんどのパイプラインでレイテンシが短縮されます。ただし、重複書き込みが発生する可能性があります。
  • FILE_LOADS。このモードでは、入力データを Cloud Storage のステージング ファイルに書き込みます。その後、BigQuery の読み込みジョブを実行して、データを BigQuery に読み込みます。 このモードは、バッチ パイプラインで最もよく見られる制限付き PCollections のデフォルトです。
  • STREAMING_INSERTS。このモードでは、コネクタは以前のストリーミング API を使用します。このモードは、制限なし PCollections のデフォルトですが、新しいプロジェクトでは推奨されません。

書き込み方法を選択する際は、次の点を考慮してください。

  • ストリーミング ジョブの場合は、STORAGE_WRITE_API または STORAGE_API_AT_LEAST_ONCE の使用を検討してください。これらのモードでは、中間ステージング ファイルを使用せずに、BigQuery ストレージに直接書き込みます。
  • 1 回以上のストリーミング モードを使用してパイプラインを実行する場合は、書き込みモードを STORAGE_API_AT_LEAST_ONCE に設定します。この設定はより効率的で、1 回以上のストリーミング モードのセマンティクスと一致します。
  • ファイルの読み込みと Storage Write API では割り当てと上限が異なります。
  • 読み込みジョブは、共有 BigQuery スロットプールまたは予約済みスロットのいずれかを使用します。予約済みスロットを使用するには、PIPELINE タイプの予約割り当てのプロジェクトで読み込みジョブを実行します。共有 BigQuery スロットプールを使用する場合、読み込みジョブは無料です。ただし、BigQuery は共有プールで使用可能な容量について保証しません。詳細については、予約の概要をご覧ください。

並列処理

  • ストリーミング パイプラインでの FILE_LOADSSTORAGE_WRITE_API の場合、コネクタはデータを多数のファイルまたはストリームにシャーディングします。通常は、withAutoSharding を呼び出して自動シャーディングを有効にすることをおすすめします。

  • バッチ パイプラインの FILE_LOADS の場合、コネクタはパーティション分割ファイルにデータを書き込み、その後、BigQuery に並列に読み込まれます。

  • バッチ パイプライン内の STORAGE_WRITE_API の場合、各ワーカーはシャードの合計数によって BigQuery に書き込む 1 つ以上のストリームを作成します。

  • STORAGE_API_AT_LEAST_ONCE の場合、デフォルトの書き込みストリームが 1 つあります。このストリームには複数のワーカーが追加されます。

パフォーマンス

次の表に、BigQuery I/O のさまざまな読み取りオプションのパフォーマンス指標を示します。ワークロードは、Apache Beam SDK 2.49.0 for Java を使用して、1 つの e2-standard2 ワーカーで実行されています。Runner v2 は使用されていません。

1 億件のレコード | 1 KB | 1 列 スループット(バイト) スループット(要素)
ストレージ書き込み 55 MBps 54,000 要素/秒
Avro の読み込み 78 MBps 77,000 要素/秒
Json の読み込み 54 MBps 53,000 要素/秒

これらの指標は、単純なバッチ パイプラインに基づいています。これは I/O コネクタ間でのパフォーマンスの比較を目的としており、必ずしも実際のパイプラインを表すものではありません。Dataflow パイプラインのパフォーマンスは複雑で、VM タイプ、処理されるデータ、外部のソースとシンクのパフォーマンス、ユーザーコードに左右されます。指標は Java SDK の実行に基づくものであり、他の言語 SDK のパフォーマンス特性を表すものではありません。詳細については、Beam I/O のパフォーマンスをご覧ください。

ベスト プラクティス

このセクションでは、Dataflow から BigQuery に書き込むためのベスト プラクティスを説明します。

一般的な考慮事項

  • Storage Write API には割り当て上限があります。コネクタは、ほとんどのパイプラインでこれらの上限を処理します。ただし、一部のシナリオでは使用可能な Storage Write API ストリームが使い果たされる可能性があります。たとえば、特に負荷の高いワークロードが長時間実行されるジョブでは、多数の宛先を持つ自動シャーディングと自動スケーリングを使用するパイプラインで、この問題が発生することがあります。この問題が発生した場合は、STORAGE_WRITE_API_AT_LEAST_ONCE の使用を検討してください。これにより、問題を回避できます。

  • Google Cloud の指標を使用して、Storage Write API の割り当て使用量をモニタリングします。

  • ファイル読み込みを使用する場合、Avro は通常 JSON よりパフォーマンスが優れています。Avro を使用するには、withAvroFormatFunction を呼び出します。

  • デフォルトでは、読み込みジョブは Dataflow ジョブと同じプロジェクトで実行されます。別のプロジェクトを指定するには、withLoadJobProjectId を呼び出します。

  • Java SDK を使用する場合は、BigQuery テーブルのスキーマを表すクラスを作成することを検討してください。次に、パイプラインで useBeamSchema を呼び出して、Apache Beam Row 型と BigQuery の TableRow 型を自動的に変換します。スキーマ クラスの例については、ExampleModel.java をご覧ください。

  • 何千ものフィールドを含む複雑なスキーマを持つテーブルを読み込む場合は、withMaxBytesPerPartition を呼び出して、読み込みジョブごとの最大サイズを小さく設定することを検討してください。

ストリーミング パイプライン

ストリーミング パイプラインには、次の推奨事項が適用されます。

  • ストリーミング パイプラインには、Storage Write API(STORAGE_WRITE_API または STORAGE_API_AT_LEAST_ONCE)を使用することをおすすめします。

  • ストリーミング パイプラインはファイル読み込みを使用できますが、次のような欠点があります。

    • ファイルを書き込むには、ウィンドウ処理が必要です。グローバル ウィンドウは使用できません。
    • BigQuery は、共有スロットプールを使用している場合、ベスト エフォート方式でファイルを読み込みます。レコードが書き込まれてから BigQuery で使用可能になるまでに、大幅な遅延が生じる可能性があります。
    • 不正なデータやスキーマの不一致などが原因で読み込みジョブが失敗すると、パイプライン全体が失敗します。
  • 可能であれば、STORAGE_WRITE_API_AT_LEAST_ONCE の使用を検討してください。その結果、重複するレコードが BigQuery に書き込まれる可能性がありますが、STORAGE_WRITE_API よりも低コストで、スケーラビリティも高くなります。

  • 通常は STREAMING_INSERTS は使用しないでください。ストリーミング挿入には Storage Write API よりもコストがかかり、パフォーマンスも低くなります。

  • データ シャーディングにより、ストリーミング パイプラインのパフォーマンスが向上します。ほとんどのパイプラインで、自動シャーディングが出発点として適しています。ただし、次のようにシャーディングを調整できます。

    • STORAGE_WRITE_API については、withNumStorageWriteApiStreams を呼び出して、書き込みストリームの数を設定します。
    • FILE_LOADS では、withNumFileShards を呼び出してファイル シャードの数を設定します。
  • ストリーミング挿入を使用する場合は、retryTransientErrors再試行ポリシーとして設定することをおすすめします。

バッチ パイプライン

バッチ パイプラインには、次の推奨事項が適用されます。

  • ほとんどの大規模なバッチ パイプラインでは、まず FILE_LOADS を試すことをおすすめします。バッチ パイプラインは STORAGE_WRITE_API を使用できますが、大規模(1,000 個以上の vCPU)な場合や同時実行のパイプラインが実行されている場合は、割り当ての上限を超える可能性があります。Apache Beam は、バッチ STORAGE_WRITE_API ジョブの書き込みストリームの最大数をスロットリングしないため、ジョブは最終的に BigQuery Storage API の上限に達します。

  • FILE_LOADS を使用すると、共有 BigQuery スロットプールまたは予約済みスロットプールのいずれかが使い果たされる可能性があります。この種の障害が発生した場合は、次の方法をお試しください。

    • ジョブのワーカーの最大数またはワーカーのサイズを減らす。
    • 予約スロットを追加購入する。
    • STORAGE_WRITE_API の使用を検討する。
  • 小規模から中規模のパイプライン(1,000 vCPU 未満)では、STORAGE_WRITE_API を使用することが効果的な場合があります。このような小規模なジョブで、デッドレター キューが必要な場合や、FILE_LOADS 共有スロットプールが不十分な場合は、STORAGE_WRITE_API の使用を検討してください。

  • 重複データを許容できる場合は、STORAGE_WRITE_API_AT_LEAST_ONCE の使用を検討してください。このモードでは、重複するレコードが BigQuery に書き込まれる可能性がありますが、STORAGE_WRITE_API オプションよりも低コストです。

  • 書き込みモードは、パイプラインの特性に応じて異なるパフォーマンスを発揮する場合があります。ワークロードに最適な書き込みモードをテストして見つけてください。

行レベルのエラーを処理する

このセクションでは、不適切な形式の入力データやスキーマの不一致が原因で発生する、行レベルのエラーの処理方法について説明します。

Storage Write API の場合、書き込みできない行は別の PCollection に配置されます。このコレクションを取得するには、WriteResult オブジェクトで getFailedStorageApiInserts を呼び出します。このアプローチの例については、BigQuery にデータをストリーミングするをご覧ください。

後で処理できるように、デッドレター キューまたはテーブルにエラーを送信することをおすすめします。このパターンの詳細については、BigQueryIO デッドレター パターンをご覧ください。

FILE_LOADS でデータの読み込み中にエラーが発生した場合、読み込みジョブが失敗し、パイプラインがランタイム例外をスローします。エラーは Dataflow ログまたは BigQuery ジョブ履歴で確認できます。I/O コネクタは、個々の失敗した行に関する情報を返しません。

エラーのトラブルシューティングの詳細については、BigQuery コネクタエラーをご覧ください。

次の例では、Dataflow を使用して BigQuery に書き込む方法を示します。

既存のテーブルに書き込む

次の例では、PCollection<MyData> を BigQuery に書き込むバッチ パイプラインを作成します(MyData はカスタムデータ型です)。

BigQueryIO.write() メソッドは、書き込みオペレーションの構成に使用される BigQueryIO.Write<T> タイプを返します。詳細については、Apache Beam ドキュメントのテーブルへの書き込みをご覧ください。このコードサンプルは、既存のテーブル(CREATE_NEVER)に書き込み、新しい行をテーブル(WRITE_APPEND)に追加します。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

新規または既存のテーブルに書き込む

次の例では、create dispositionCREATE_IF_NEEDED に設定することで、宛先テーブルが存在しない場合に新しいテーブルを作成します。このオプションを使用する場合は、テーブル スキーマを指定する必要があります。コネクタは、新しいテーブルを作成するときにこのスキーマを使用します。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

BigQuery にデータをストリーミングする

次の例は、書き込みモードを STORAGE_WRITE_API に設定して、exactly-once セマンティクスを使用してデータをストリーミングする方法を示しています。

すべてのストリーミング パイプラインが exactly-once セマンティクスを必要とするわけではありません。たとえば、宛先テーブルから重複を手動で削除できる場合があります。シナリオでレコードの重複が許容される場合は、書き込みメソッドSTORAGE_API_AT_LEAST_ONCE に設定して、exactly-once セマンティクスを使用することを検討してください。通常、この方法は効率的で、ほとんどのパイプラインでレイテンシが短縮されます。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

次のステップ