このドキュメントでは、BigQuery から Dataflow にデータを読み取る方法について説明します。
概要
ほとんどのユースケースでは、マネージド I/O を使用して BigQuery から読み取ることを検討してください。マネージド I/O には、自動アップグレードや一貫した構成 API などの機能があります。BigQuery から読み取るとき、マネージド I/O は直接テーブル読み取りを実行し、最適な読み取りパフォーマンスを提供します。
より高度なパフォーマンス チューニングが必要な場合は、BigQueryIO
コネクタの使用を検討してください。BigQueryIO
コネクタは、テーブルの直接読み取りと BigQuery Export ジョブからの読み取りの両方をサポートしています。また、テーブル レコードの逆シリアル化をよりきめ細かく制御できます。詳細については、このドキュメントの BigQueryIO
コネクタを使用するをご覧ください。
列の射影とフィルタリング
パイプラインが BigQuery から読み取るデータ量を減らすには、次の手法を使用します。
- 列射影では、テーブルから読み取る列のサブセットを指定します。テーブルに多数の列があり、そのサブセットのみを読み取る必要がある場合は、列投影を使用します。
- 行フィルタリングでは、テーブルに適用する述語を指定します。BigQuery の読み取りオペレーションは、フィルタに一致する行のみを返します。これにより、パイプラインによって取り込まれるデータの合計量を削減できます。
次の例では、テーブルから "user_name"
列と "age"
列を読み取り、述語 "age > 18"
と一致しない行を除外します。この例では、マネージド I/O を使用します。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
クエリ結果から読み取る
次の例では、マネージド I/O を使用して SQL クエリの結果を読み取ります。BigQuery の一般公開データセットに対してクエリを実行します。SQL クエリを使用して、BigQuery ビューまたはマテリアライズド ビューから読み取ることもできます。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
BigQueryIO
コネクタを使用する
BigQueryIO
コネクタは、次のシリアル化方法をサポートしています。
- Avro 形式のレコードとしてデータを読み取る。この方法では、Avro レコードを解析してカスタムデータ型に変換する関数を用意します。
- データを
TableRow
オブジェクトとして読み取る。この方法は、カスタムデータ型が必要ないため便利です。ただし、通常 Avro 形式のレコードを読み取る場合よりもパフォーマンスが低くなります。
このコネクタは、データの読み取りに次の 2 つのオプションをサポートしています。
- ジョブをエクスポートする。デフォルトでは、
BigQueryIO
コネクタは、テーブルデータを Cloud Storage に書き込む BigQuery エクスポート ジョブを実行します。その後、コネクタが Cloud Storage からデータを読み取ります。 - テーブルの直接読み取り。BigQuery Storage Read API を使用し、エクスポート ステップをスキップするため、このオプションはエクスポート ジョブよりも高速です。テーブルの直接読み取りを使用するには、パイプラインをビルドするときに
withMethod(Method.DIRECT_READ)
を呼び出します。
使用するオプションを選択する際は、次の点を考慮してください。
通常は、直接テーブル読み取りを使用することをおすすめします。Storage Read API は、データをエクスポートする中間手順を必要としないため、エクスポート ジョブよりもデータ パイプラインに適しています。
直接読み取りを使用すると、Storage Read API の使用量に対して料金が発生します。BigQuery の料金ページのデータ抽出の料金をご覧ください。
エクスポート ジョブに追加料金はかかりません。ただし、エクスポート ジョブには上限があります。タイミングを優先し、コストを調整できる大規模なデータ移動の場合は、直接読み取りをおすすめします。
Storage Read API には割り当て上限があります。Google Cloud 指標を使用して、割り当ての使用状況をモニタリングします。
エクスポート ジョブを使用する場合は、
--tempLocation
パイプライン オプションを設定して、エクスポートされたファイルの Cloud Storage バケットを指定します。Storage Read API を使用すると、次のようなリースの有効期限エラーとセッション タイムアウト エラーがログに表示されることがあります。
DEADLINE_EXCEEDED
Server Unresponsive
StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session
`
これらのエラーは、オペレーションがタイムアウトよりも時間がかかる場合に発生することがあります。通常は、実行時間が 6 時間を超えるパイプラインで発生します。この問題を軽減するには、ファイル エクスポートに切り替えます。
並列処理の度合いは読み取り方法によって異なります。
直接読み取り: I/O コネクタは、エクスポート リクエストのサイズに基づいて動的な数のストリームを生成します。これらのストリームを BigQuery から並行して読み取ります。
エクスポート ジョブ: BigQuery は、Cloud Storage に書き込むファイルの数を決定します。ファイルの数は、クエリとデータ量によって異なります。I/O コネクタは、エクスポートされたファイルを並行して読み取ります。
次の表に、BigQuery I/O のさまざまな読み取りオプションのパフォーマンス指標を示します。ワークロードは、Apache Beam SDK 2.49.0 for Java を使用して、1 つの e2-standard2
ワーカーで実行されています。Runner v2 は使用されていません。
1 億件のレコード | 1 KB | 1 列 | スループット(バイト) | スループット(要素) |
---|---|---|
ストレージ読み取り | 120 MBps | 88,000 要素/秒 |
Avro エクスポート | 105 MBps | 78,000 要素/秒 |
JSON のエクスポート | 110 MBps | 81,000 要素/秒 |
これらの指標は、単純なバッチ パイプラインに基づいています。これは I/O コネクタ間でのパフォーマンスの比較を目的としており、必ずしも実際のパイプラインを表すものではありません。Dataflow パイプラインのパフォーマンスは複雑で、VM タイプ、処理されるデータ、外部のソースとシンクのパフォーマンス、ユーザーコードに左右されます。指標は Java SDK の実行に基づくものであり、他の言語 SDK のパフォーマンス特性を表すものではありません。詳細については、Beam I/O のパフォーマンスをご覧ください。
例
次のコードサンプルでは、テーブルの直接読み取りで BigQueryIO
コネクタを使用します。エクスポート ジョブを使用する場合は、withMethod
の呼び出しを省略します。
Avro 形式のレコードを読み取る
この例では、BigQueryIO
コネクタを使用して Avro 形式のレコードを読み取る方法を示します。
BigQuery データを Avro 形式のレコードに読み取るには、read(SerializableFunction)
メソッドを使用します。このメソッドは、SchemaAndRecord
オブジェクトを解析し、カスタムデータ型を返すアプリケーション定義関数を使用します。コネクタからの出力はカスタムデータ型の PCollection
です。
次のコードは、BigQuery テーブルから PCollection<MyData>
を読み取ります。ここで、MyData
はアプリケーション定義クラスです。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
read
メソッドは SerializableFunction<SchemaAndRecord, T>
インターフェースを使用します。このインターフェースには、Avro レコードからカスタム データクラスに変換する関数が定義されています。上のコードサンプルでは、MyData.apply
メソッドがこの変換関数を実装しています。このサンプル関数は、Avro レコードの name
フィールドと age
フィールドを解析し、MyData
インスタンスを返します。
読み取る BigQuery テーブルを指定するには、前の例のように from
メソッドを呼び出します。詳細については、BigQuery I/O コネクタ ドキュメントのテーブル名をご覧ください。
TableRow
オブジェクトを読み取る
この例では、BigQueryIO
コネクタを使用して TableRow
オブジェクトを読み取る方法を示します。
readTableRows
メソッドは、BigQuery データを TableRow
オブジェクトの PCollection
に読み込みます。各 TableRow
は、テーブルデータの単一行を保持する Key-Value ペアのマップです。from
メソッドを呼び出して、読み取る BigQuery テーブルを指定します。
次のコードは、BigQuery テーブルから PCollection<TableRows>
を読み取ります。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
また、この例には、TableRow
ディクショナリの値にアクセスする方法も示されています。整数値は、BigQuery でエクスポートされた JSON 形式と一致するように文字列としてエンコードされます。