BigQuery から Dataflow に読み込む

このドキュメントでは、BigQuery から Dataflow にデータを読み取る方法について説明します。

概要

ほとんどのユースケースでは、マネージド I/O を使用して BigQuery から読み取ることを検討してください。マネージド I/O には、自動アップグレードや一貫した構成 API などの機能があります。BigQuery から読み取るとき、マネージド I/O は直接テーブル読み取りを実行し、最適な読み取りパフォーマンスを提供します。

より高度なパフォーマンス チューニングが必要な場合は、BigQueryIO コネクタの使用を検討してください。BigQueryIO コネクタは、テーブルの直接読み取りと BigQuery Export ジョブからの読み取りの両方をサポートしています。また、テーブル レコードの逆シリアル化をよりきめ細かく制御できます。詳細については、このドキュメントの BigQueryIO コネクタを使用するをご覧ください。

列の射影とフィルタリング

パイプラインが BigQuery から読み取るデータ量を減らすには、次の手法を使用します。

  • 列射影では、テーブルから読み取る列のサブセットを指定します。テーブルに多数の列があり、そのサブセットのみを読み取る必要がある場合は、列投影を使用します。
  • 行フィルタリングでは、テーブルに適用する述語を指定します。BigQuery の読み取りオペレーションは、フィルタに一致する行のみを返します。これにより、パイプラインによって取り込まれるデータの合計量を削減できます。

次の例では、テーブルから "user_name" 列と "age" 列を読み取り、述語 "age > 18" と一致しない行を除外します。この例では、マネージド I/O を使用します。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    String tableSpec = String.format("%s:%s.%s",
        options.getProjectId(),
        options.getDatasetName(),
        options.getTableName());

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", tableSpec)
        .put("row_restriction", "age > 18")
        .put("fields", List.of("user_name", "age"))
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Name: %s, Age: %s%n",
                  row.getString("user_name"),
                  row.getInt64("age"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

クエリ結果から読み取る

次の例では、マネージド I/O を使用して SQL クエリの結果を読み取ります。BigQuery の一般公開データセットに対してクエリを実行します。SQL クエリを使用して、BigQuery ビューまたはマテリアライズド ビューから読み取ることもできます。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("query", queryString)
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Repo: %s, commits: %d%n",
                  row.getString("repo"),
                  row.getInt64("count"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

BigQueryIO コネクタを使用する

BigQueryIO コネクタは、次のシリアル化方法をサポートしています。

このコネクタは、データの読み取りに次の 2 つのオプションをサポートしています。

  • ジョブをエクスポートする。デフォルトでは、BigQueryIO コネクタは、テーブルデータを Cloud Storage に書き込む BigQuery エクスポート ジョブを実行します。その後、コネクタが Cloud Storage からデータを読み取ります。
  • テーブルの直接読み取りBigQuery Storage Read API を使用し、エクスポート ステップをスキップするため、このオプションはエクスポート ジョブよりも高速です。テーブルの直接読み取りを使用するには、パイプラインをビルドするときに withMethod(Method.DIRECT_READ) を呼び出します。

使用するオプションを選択する際は、次の点を考慮してください。

  • 通常は、直接テーブル読み取りを使用することをおすすめします。Storage Read API は、データをエクスポートする中間手順を必要としないため、エクスポート ジョブよりもデータ パイプラインに適しています。

  • 直接読み取りを使用すると、Storage Read API の使用量に対して料金が発生します。BigQuery の料金ページのデータ抽出の料金をご覧ください。

  • エクスポート ジョブに追加料金はかかりません。ただし、エクスポート ジョブには上限があります。タイミングを優先し、コストを調整できる大規模なデータ移動の場合は、直接読み取りをおすすめします。

  • Storage Read API には割り当て上限があります。Google Cloud 指標を使用して、割り当ての使用状況をモニタリングします。

  • エクスポート ジョブを使用する場合は、--tempLocation パイプライン オプションを設定して、エクスポートされたファイルの Cloud Storage バケットを指定します。

  • Storage Read API を使用すると、次のようなリースの有効期限エラーとセッション タイムアウト エラーがログに表示されることがあります。

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    これらのエラーは、オペレーションがタイムアウトよりも時間がかかる場合に発生することがあります。通常は、実行時間が 6 時間を超えるパイプラインで発生します。この問題を軽減するには、ファイル エクスポートに切り替えます。

  • 並列処理の度合いは読み取り方法によって異なります。

    • 直接読み取り: I/O コネクタは、エクスポート リクエストのサイズに基づいて動的な数のストリームを生成します。これらのストリームを BigQuery から並行して読み取ります。

    • エクスポート ジョブ: BigQuery は、Cloud Storage に書き込むファイルの数を決定します。ファイルの数は、クエリとデータ量によって異なります。I/O コネクタは、エクスポートされたファイルを並行して読み取ります。

次の表に、BigQuery I/O のさまざまな読み取りオプションのパフォーマンス指標を示します。ワークロードは、Apache Beam SDK 2.49.0 for Java を使用して、1 つの e2-standard2 ワーカーで実行されています。Runner v2 は使用されていません。

1 億件のレコード | 1 KB | 1 列 スループット(バイト) スループット(要素)
ストレージ読み取り 120 MBps 88,000 要素/秒
Avro エクスポート 105 MBps 78,000 要素/秒
JSON のエクスポート 110 MBps 81,000 要素/秒

これらの指標は、単純なバッチ パイプラインに基づいています。これは I/O コネクタ間でのパフォーマンスの比較を目的としており、必ずしも実際のパイプラインを表すものではありません。Dataflow パイプラインのパフォーマンスは複雑で、VM タイプ、処理されるデータ、外部のソースとシンクのパフォーマンス、ユーザーコードに左右されます。指標は Java SDK の実行に基づくものであり、他の言語 SDK のパフォーマンス特性を表すものではありません。詳細については、Beam I/O のパフォーマンスをご覧ください。

次のコードサンプルでは、テーブルの直接読み取りで BigQueryIO コネクタを使用します。エクスポート ジョブを使用する場合は、withMethod の呼び出しを省略します。

Avro 形式のレコードを読み取る

この例では、BigQueryIO コネクタを使用して Avro 形式のレコードを読み取る方法を示します。

BigQuery データを Avro 形式のレコードに読み取るには、read(SerializableFunction) メソッドを使用します。このメソッドは、SchemaAndRecord オブジェクトを解析し、カスタムデータ型を返すアプリケーション定義関数を使用します。コネクタからの出力はカスタムデータ型の PCollection です。

次のコードは、BigQuery テーブルから PCollection<MyData> を読み取ります。ここで、MyData はアプリケーション定義クラスです。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

read メソッドは SerializableFunction<SchemaAndRecord, T> インターフェースを使用します。このインターフェースには、Avro レコードからカスタム データクラスに変換する関数が定義されています。上のコードサンプルでは、MyData.apply メソッドがこの変換関数を実装しています。このサンプル関数は、Avro レコードの name フィールドと age フィールドを解析し、MyData インスタンスを返します。

読み取る BigQuery テーブルを指定するには、前の例のように from メソッドを呼び出します。詳細については、BigQuery I/O コネクタ ドキュメントのテーブル名をご覧ください。

TableRow オブジェクトを読み取る

この例では、BigQueryIO コネクタを使用して TableRow オブジェクトを読み取る方法を示します。

readTableRows メソッドは、BigQuery データを TableRow オブジェクトの PCollection に読み込みます。各 TableRow は、テーブルデータの単一行を保持する Key-Value ペアのマップです。from メソッドを呼び出して、読み取る BigQuery テーブルを指定します。

次のコードは、BigQuery テーブルから PCollection<TableRows> を読み取ります。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

また、この例には、TableRow ディクショナリの値にアクセスする方法も示されています。整数値は、BigQuery でエクスポートされた JSON 形式と一致するように文字列としてエンコードされます。

次のステップ