Dataflow から Cloud Storage に書き込む

このドキュメントでは、Apache Beam TextIO I/O コネクタを使用して、Dataflow から Cloud Storage にテキストデータを書き込む方法について説明します。

Google Cloud ライブラリの依存関係を含める

Cloud Storage で TextIO コネクタを使用するには、次の依存関係を含めます。このライブラリは、"gs://" ファイル名のスキーマ ハンドラを提供します。

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

詳細については、Apache Beam SDK をインストールするをご覧ください。

Dataflow の Apache Beam I/O コネクタで gRPC を有効にする

Dataflow の Apache Beam I/O コネクタを介して、gRPC を使用して Cloud Storage に接続できます。gRPC は、Google が開発した高パフォーマンスのオープンソースのリモート プロシージャ コール(RPC)フレームワークで、Cloud Storage の操作に使用できます。

Dataflow ジョブの Cloud Storage への書き込みリクエストを高速化するには、Dataflow で Apache Beam I/O コネクタを有効にして gRPC を使用します。

コマンドライン

  1. Apache Beam SDK バージョン 2.55.0 以降を使用していることを確認します。
  2. Dataflow ジョブを実行するには、--additional-experiments=use_grpc_for_gcs パイプライン オプションを使用します。さまざまなパイプライン オプションについては、オプションのフラグをご覧ください。

Apache Beam SDK

  1. Apache Beam SDK バージョン 2.55.0 以降を使用していることを確認します。
  2. Dataflow ジョブを実行するには、--experiments=use_grpc_for_gcs パイプライン オプションを使用します。さまざまなパイプライン オプションについては、基本オプションをご覧ください。

Dataflow で Apache Beam I/O コネクタを構成して、Cloud Monitoring で gRPC 関連の指標を生成できます。gRPC 関連の指標は、次の作業に役立ちます。

  • Cloud Storage への gRPC リクエストのパフォーマンスをモニタリングして最適化する。
  • 問題のトラブルシューティングとデバッグを行う。
  • アプリの使用状況と動作に関する分析情報を取得します。

Dataflow で Apache Beam I/O コネクタを構成して gRPC 関連の指標を生成する方法については、クライアントサイド指標を使用するをご覧ください。ユースケースで指標の収集が不要な場合は、指標の収集をオプトアウトできます。手順については、クライアントサイドの指標をオプトアウトするをご覧ください。

並列処理

並列処理は主にシャードの数によって決まります。デフォルトでは、ランナーはこの値を自動的に設定します。ほとんどのパイプラインでは、デフォルトの動作を使用することをおすすめします。このドキュメントのベスト プラクティスをご覧ください。

パフォーマンス

次の表に、Cloud Storage への書き込みのパフォーマンス指標を示します。ワークロードは、Apache Beam SDK 2.49.0 for Java を使用して、1 つの e2-standard2 ワーカーで実行されています。Runner v2 は使用されていません。

1 億件のレコード | 1 KB | 1 列 スループット(バイト) スループット(要素)
書き込み 130 MBps 130,000 要素/秒

これらの指標は、単純なバッチ パイプラインに基づいています。これは I/O コネクタ間でのパフォーマンスの比較を目的としており、必ずしも実際のパイプラインを表すものではありません。Dataflow パイプラインのパフォーマンスは複雑で、VM タイプ、処理されるデータ、外部のソースとシンクのパフォーマンス、ユーザーコードに左右されます。指標は Java SDK の実行に基づくものであり、他の言語 SDK のパフォーマンス特性を表すものではありません。詳細については、Beam I/O のパフォーマンスをご覧ください。

ベスト プラクティス

  • 一般に、特定の数のシャードを設定することは避けてください。これにより、ランナーはスケールに適した値を選択できるようになります。シャードの数を調整する場合は、シャードあたり 100 MB~1 GB の範囲で書き込むことをおすすめします。ただし、最適な値はワークロードによって異なる場合があります。

  • Cloud Storage では、スケーリングにより 1 秒間に非常に大量のリクエストを処理できます。ただし、パイプラインで書き込み量が急増した場合は、1 つの Cloud Storage バケットが一時的に過負荷状態にならないように、複数のバケットへの書き込みを検討してください。

  • 一般に、Cloud Storage への書き込みは、1 回の書き込みのサイズが大きいほうが効率的になります(1 KB 以上のサイズ)。サイズの小さなレコードを多くのファイルに書き込むと、バイトあたりのパフォーマンスが低下する可能性があります。

  • ファイル名を生成する際は、負荷を分散するために、連続していないファイル名を使用することを検討してください。詳細については、命名規則を使ってキーの範囲に負荷を均等に分散するをご覧ください。

  • ファイル名で、アットマーク(@)の後に数字またはアスタリスク(*)を続けて使用しないでください。詳細については、@* と @N は予約済みのシャーディング仕様をご覧ください。

例: Cloud Storage にテキスト ファイルを書き込む

次の例では、GZIP 圧縮を使用してテキスト ファイルを書き込むバッチ パイプラインを作成します。

Java

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BatchWriteStorage {
  public interface Options extends PipelineOptions {
    @Description("The Cloud Storage bucket to write to")
    String getBucketName();

    void setBucketName(String value);
  }

  // Write text data to Cloud Storage
  public static void main(String[] args) {
    final List<String> wordsList = Arrays.asList("1", "2", "3", "4");

    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        .apply(Create
            .of(wordsList))
        .apply(TextIO
            .write()
            .to(options.getBucketName())
            .withSuffix(".txt")
            .withCompression(Compression.GZIP)
        );
    pipeline.run().waitUntilFinish();
  }
}

入力 PCollection が無制限の場合、コレクションに対してウィンドウまたはトリガーを定義してから、TextIO.Write.withWindowedWrites を呼び出してウィンドウ処理された書き込みを指定する必要があります。

Python

Dataflow への認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。

import argparse
from typing import List

import apache_beam as beam
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def write_to_cloud_storage(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application.
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option that specfies the Cloud Storage bucket.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--output", required=True)

    wordsList = ["1", "2", "3", "4"]
    options = MyOptions()

    with beam.Pipeline(options=options.view_as(PipelineOptions)) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(wordsList)
            | "Write Files" >> WriteToText(options.output, file_name_suffix=".txt")
        )

出力のパスには、バケット名とファイル名の接頭辞を含む Cloud Storage パスを指定します。たとえば、gs://my_bucket/output/file を指定すると、TextIO コネクタは my_bucket という名前の Cloud Storage バケットに書き込みます。出力ファイルには接頭辞 output/file* が付いています。

デフォルトでは、TextIO コネクタは <file-prefix>-00000-of-00001 のような命名規則を使用して出力ファイルをシャーディングします。必要に応じて、次の例に示すように、ファイル名の接尾辞と圧縮スキームを指定できます。

べき等な書き込みを行うため、Dataflow は一時ファイルに書き込みを行い、完了した一時ファイルを最終ファイルにコピーします。これらの一時ファイルの保存場所を操作するには、withTempDirectory メソッドを使用します。

次のステップ