このドキュメントでは、Apache Beam TextIO
I/O コネクタを使用して、Dataflow から Cloud Storage にテキストデータを書き込む方法について説明します。
Google Cloud ライブラリの依存関係を含める
Cloud Storage で TextIO
コネクタを使用するには、次の依存関係を含めます。このライブラリは、"gs://"
ファイル名のスキーマ ハンドラを提供します。
Java
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Python
apache-beam[gcp]==VERSION
Go
import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"
詳細については、Apache Beam SDK をインストールするをご覧ください。
Dataflow の Apache Beam I/O コネクタで gRPC を有効にする
Dataflow の Apache Beam I/O コネクタを介して、gRPC を使用して Cloud Storage に接続できます。gRPC は、Google が開発した高パフォーマンスのオープンソースのリモート プロシージャ コール(RPC)フレームワークで、Cloud Storage の操作に使用できます。
Dataflow ジョブの Cloud Storage への書き込みリクエストを高速化するには、Dataflow で Apache Beam I/O コネクタを有効にして gRPC を使用します。
コマンドライン
- Apache Beam SDK バージョン 2.55.0 以降を使用していることを確認します。
- Dataflow ジョブを実行するには、
--additional-experiments=use_grpc_for_gcs
パイプライン オプションを使用します。さまざまなパイプライン オプションについては、オプションのフラグをご覧ください。
Apache Beam SDK
- Apache Beam SDK バージョン 2.55.0 以降を使用していることを確認します。
-
Dataflow ジョブを実行するには、
--experiments=use_grpc_for_gcs
パイプライン オプションを使用します。さまざまなパイプライン オプションについては、基本オプションをご覧ください。
Dataflow で Apache Beam I/O コネクタを構成して、Cloud Monitoring で gRPC 関連の指標を生成できます。gRPC 関連の指標は、次の作業に役立ちます。
- Cloud Storage への gRPC リクエストのパフォーマンスをモニタリングして最適化する。
- 問題のトラブルシューティングとデバッグを行う。
- アプリの使用状況と動作に関する分析情報を取得します。
Dataflow で Apache Beam I/O コネクタを構成して gRPC 関連の指標を生成する方法については、クライアントサイド指標を使用するをご覧ください。ユースケースで指標の収集が不要な場合は、指標の収集をオプトアウトできます。手順については、クライアントサイドの指標をオプトアウトするをご覧ください。
並列処理
並列処理は主にシャードの数によって決まります。デフォルトでは、ランナーはこの値を自動的に設定します。ほとんどのパイプラインでは、デフォルトの動作を使用することをおすすめします。このドキュメントのベスト プラクティスをご覧ください。
パフォーマンス
次の表に、Cloud Storage への書き込みのパフォーマンス指標を示します。ワークロードは、Apache Beam SDK 2.49.0 for Java を使用して、1 つの e2-standard2
ワーカーで実行されています。Runner v2 は使用されていません。
1 億件のレコード | 1 KB | 1 列 | スループット(バイト) | スループット(要素) |
---|---|---|
書き込み | 130 MBps | 130,000 要素/秒 |
これらの指標は、単純なバッチ パイプラインに基づいています。これは I/O コネクタ間でのパフォーマンスの比較を目的としており、必ずしも実際のパイプラインを表すものではありません。Dataflow パイプラインのパフォーマンスは複雑で、VM タイプ、処理されるデータ、外部のソースとシンクのパフォーマンス、ユーザーコードに左右されます。指標は Java SDK の実行に基づくものであり、他の言語 SDK のパフォーマンス特性を表すものではありません。詳細については、Beam I/O のパフォーマンスをご覧ください。
ベスト プラクティス
一般に、特定の数のシャードを設定することは避けてください。これにより、ランナーはスケールに適した値を選択できるようになります。シャードの数を調整する場合は、シャードあたり 100 MB~1 GB の範囲で書き込むことをおすすめします。ただし、最適な値はワークロードによって異なる場合があります。
Cloud Storage では、スケーリングにより 1 秒間に非常に大量のリクエストを処理できます。ただし、パイプラインで書き込み量が急増した場合は、1 つの Cloud Storage バケットが一時的に過負荷状態にならないように、複数のバケットへの書き込みを検討してください。
一般に、Cloud Storage への書き込みは、1 回の書き込みのサイズが大きいほうが効率的になります(1 KB 以上のサイズ)。サイズの小さなレコードを多くのファイルに書き込むと、バイトあたりのパフォーマンスが低下する可能性があります。
ファイル名を生成する際は、負荷を分散するために、連続していないファイル名を使用することを検討してください。詳細については、命名規則を使ってキーの範囲に負荷を均等に分散するをご覧ください。
ファイル名で、アットマーク(@)の後に数字またはアスタリスク(*)を続けて使用しないでください。詳細については、@* と @N は予約済みのシャーディング仕様をご覧ください。
例: Cloud Storage にテキスト ファイルを書き込む
次の例では、GZIP 圧縮を使用してテキスト ファイルを書き込むバッチ パイプラインを作成します。
Java
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
入力 PCollection
が無制限の場合、コレクションに対してウィンドウまたはトリガーを定義してから、TextIO.Write.withWindowedWrites
を呼び出してウィンドウ処理された書き込みを指定する必要があります。
Python
Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。
出力のパスには、バケット名とファイル名の接頭辞を含む Cloud Storage パスを指定します。たとえば、gs://my_bucket/output/file
を指定すると、TextIO
コネクタは my_bucket
という名前の Cloud Storage バケットに書き込みます。出力ファイルには接頭辞 output/file*
が付いています。
デフォルトでは、TextIO
コネクタは <file-prefix>-00000-of-00001
のような命名規則を使用して出力ファイルをシャーディングします。必要に応じて、次の例に示すように、ファイル名の接尾辞と圧縮スキームを指定できます。
べき等な書き込みを行うため、Dataflow は一時ファイルに書き込みを行い、完了した一時ファイルを最終ファイルにコピーします。これらの一時ファイルの保存場所を操作するには、withTempDirectory
メソッドを使用します。
次のステップ
TextIO
API ドキュメントを読む。- Google 提供のテンプレートのリストを確認する。