このページでは、Dataflow ジョブのパイプライン オプションを設定する方法について説明します。これらのパイプライン オプションは、パイプラインの実行方法と実行場所、使用されるリソースを構成します。
パイプラインの実行は、Apache Beam プログラムの実行とは別のものです。記述した Apache Beam プログラムは、遅延実行用のパイプラインを構築します。つまり、このプログラムは、サポートされている Apache Beam ランナーが実行できる一連のステップを生成します。互換性のあるランナーには、Google Cloud 上の Dataflow ランナーと、ローカル環境でパイプラインを直接実行するダイレクト ランナーが含まれます。
ランタイムにパラメータを Dataflow ジョブに渡すことができます。ランタイムにパイプライン オプションを設定する方法については、パイプライン オプションの構成をご覧ください。
Apache Beam SDK でパイプライン オプションを使用する
次の SDK を使用して、Dataflow ジョブのパイプライン オプションを設定できます。
- Apache Beam SDK for Python
- Apache Beam SDK for Java
- Apache Beam SDK for Go
SDK を使用するには、Apache Beam SDK クラス PipelineOptions
を使用してパイプライン ランナーとその他の実行パラメータを設定します。
パイプライン オプションを指定するには、2 通りの方法があります。
- パイプライン オプションのリストを指定して、パイプライン オプションをプログラムで設定する。
- パイプライン コードの実行時に、コマンドラインで直接パイプライン オプションを設定する。
プログラムでパイプライン オプションを設定する
パイプライン オプションをプログラムで設定するには、PipelineOptions
オブジェクトを作成して変更します。
Java
メソッド PipelineOptionsFactory.fromArgs
を使用して PipelineOptions
オブジェクトを作成します。
例については、このページの Dataflow サンプルで起動するをご覧ください。
Python
PipelineOptions
オブジェクトを作成します。
例については、このページの Dataflow サンプルで起動するをご覧ください。
Go
PipelineOptions
を使用してパイプライン オプションをプログラムで設定することは、Apache Beam SDK for Go ではサポートされていません。Go のコマンドライン引数を使用してください。
例については、このページの Dataflow サンプルで起動するをご覧ください。
コマンドラインでパイプライン オプションを設定する
コマンドライン引数を使用して、パイプライン オプションを設定できます。
Java
次の構文例は、Java クイックスタートの WordCount
パイプラインのものです。
mvn -Pdataflow-runner compile exec:java \
-Dexec.mainClass=org.apache.beam.examples.WordCount \
-Dexec.args="--project=PROJECT_ID \
--gcpTempLocation=gs://BUCKET_NAME/temp/ \
--output=gs://BUCKET_NAME/output \
--runner=DataflowRunner \
--region=REGION"
次のように置き換えます。
PROJECT_ID
: 実際の Google Cloud プロジェクト IDBUCKET_NAME
: Cloud Storage バケットの名前REGION
: Dataflow リージョン(us-central1
)
Python
次の構文例は、Python クイックスタートの WordCount
パイプラインのものです。
python -m apache_beam.examples.wordcount \
--region DATAFLOW_REGION \
--input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://STORAGE_BUCKET/results/outputs \
--runner DataflowRunner \
--project PROJECT_ID \
--temp_location gs://STORAGE_BUCKET/tmp/
次のように置き換えます。
DATAFLOW_REGION
: Dataflow ジョブをデプロイするリージョン(例:europe-west1
)--region
フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。STORAGE_BUCKET
: Cloud Storage バケット名PROJECT_ID
: Google Cloud プロジェクト ID
Go
次の構文例は、Go クイックスタートの WordCount
パイプラインのものです。
go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
--output gs://BUCKET_NAME/results/outputs \
--runner dataflow \
--project PROJECT_ID \
--region DATAFLOW_REGION \
--staging_location gs://BUCKET_NAME/binaries/
次のように置き換えます。
BUCKET_NAME
: Cloud Storage バケット名PROJECT_ID
: Google Cloud プロジェクト IDDATAFLOW_REGION
: Dataflow ジョブをデプロイするリージョン。例:europe-west1
--region
フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。
試験運用版のパイプライン オプションを設定する
Java、Python、Go SDK では、experiments
パイプライン オプションを使用して、試験運用版または一般提供前の Dataflow 機能を有効にします。
プログラムで設定する
プログラムで experiments
オプションを設定するには、次の構文を使用します。
Java
PipelineOptions
オブジェクトに、次の構文を使用して experiments
オプションを追加します。この例では、テストフラグを使用してブートディスクのサイズを 80 GB に設定しています。
options.setExperiments("streaming_boot_disk_size_gb=80")
PipelineOptions
オブジェクトの作成例については、このページの Dataflow サンプルで起動するをご覧ください。
Python
PipelineOptions
オブジェクトに、次の構文を使用して experiments
オプションを追加します。この例では、テストフラグを使用してブートディスクのサイズを 80 GB に設定しています。
beam_options = PipelineOptions(
beam_args,
experiments=['streaming_boot_disk_size_gb=80'])
PipelineOptions
オブジェクトの作成例については、このページの Dataflow サンプルで起動するをご覧ください。
Go
PipelineOptions
を使用してパイプライン オプションをプログラムで設定することは、Apache Beam SDK for Go ではサポートされていません。Go のコマンドライン引数を使用してください。
コマンドラインで設定する
コマンドラインで experiments
オプションを設定するには、次の構文を使用します。
Java
この例では、テストフラグを使用してブートディスクのサイズを 80 GB に設定しています。
--experiments=streaming_boot_disk_size_gb=80
Python
この例では、テストフラグを使用してブートディスクのサイズを 80 GB に設定しています。
--experiments=streaming_boot_disk_size_gb=80
Go
この例では、テストフラグを使用してブートディスクのサイズを 80 GB に設定しています。
--experiments=streaming_boot_disk_size_gb=80
テンプレートで設定する
Dataflow テンプレートの実行時に試験運用版機能を有効にするには、--additional-experiments
フラグを使用します。
クラシック テンプレート
gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]
Flex テンプレート
gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]
パイプライン オプション オブジェクトにアクセスする
Apache Beam プログラムで Pipeline
オブジェクトを作成するときに、PipelineOptions
を渡します。Dataflow サービスは、パイプラインを実行するときに、PipelineOptions
のコピーを各ワーカーに送信します。
Java
ProcessContext.getPipelineOptions
メソッドを使用して、ParDo
変換の DoFn
インスタンス内の PipelineOptions
にアクセスします。
Python
この機能は Apache Beam SDK for Python ではサポートされていません。
Go
beam.PipelineOptions
を使用してパイプライン オプションにアクセスします。
Dataflow で起動する
Dataflow ランナー サービスを使用して、マネージド Google Cloud リソースでジョブを実行します。Dataflow でパイプラインを実行すると、Google Cloud プロジェクトの Compute Engine リソースと Cloud Storage リソースを使用する Dataflow ジョブが作成されます。Dataflow の権限の詳細については、Dataflow のセキュリティと権限をご覧ください。
Dataflow ジョブでは、パイプライン実行中に一時ファイルを格納するために Cloud Storage を使用します。不要なストレージ費用が発生しないようにするには、Dataflow ジョブで一時的なストレージに使用するバケットで削除(復元可能)機能をオフにします。詳細については、バケットから削除(復元可能)ポリシーを削除するをご覧ください。
必須オプションを設定する
Dataflow を使用してパイプラインを実行するには、次のパイプライン オプションを設定します。
Java
project
: Google Cloud プロジェクトの ID。runner
: パイプラインを実行するパイプライン ランナー。Google Cloud で実行する場合はDataflowRunner
である必要があります。gcpTempLocation
: 多くの一時ファイルをステージングするための Dataflow 用の Cloud Storage パス。バケットを指定する場合は、事前にバケットを作成しておく必要があります。gcpTempLocation
を設定しない場合は、パイプライン オプションtempLocation
を設定できます。その場合、gcpTempLocation
はtempLocation
の値に設定されます。どちらも指定されていない場合は、デフォルトのgcpTempLocation
が作成されます。stagingLocation
: バイナリ ファイルをステージングするための Dataflow 用の Cloud Storage パス。Apache Beam SDK 2.28 以降を使用している場合は、このオプションを設定しないでください。Apache Beam SDK 2.28 以前を使用している場合は、このオプションを設定しないと、tempLocation
に指定したものがステージングのロケーションに使用されます。このオプションと
tempLocation
のどちらも指定されていない場合は、デフォルトのgcpTempLocation
が作成されます。tempLocation
が指定され、gcpTempLocation
が指定されていない場合、tempLocation
は Cloud Storage パスである必要があり、gcpTempLocation
はデフォルトで同じ値になります。tempLocation
が指定されず、gcpTempLocation
が指定されている場合は、tempLocation
に値は入力されません。
Python
project
: Google Cloud プロジェクト ID。region
: Dataflow ジョブのリージョン。runner
: パイプラインを実行するパイプライン ランナー。Google Cloud で実行する場合はDataflowRunner
である必要があります。temp_location
: パイプラインの実行中に作成される一時ジョブファイルをステージングするための Dataflow 用の Cloud Storage パス。
Go
project
: Google Cloud プロジェクト ID。region
: Dataflow ジョブのリージョン。runner
: パイプラインを実行するパイプライン ランナー。Google Cloud で実行する場合はdataflow
である必要があります。staging_location
: パイプラインの実行中に作成される一時ジョブファイルをステージングするための Dataflow 用の Cloud Storage パス。
プログラムでパイプライン オプションを設定する
次のサンプルコードは、Dataflow を使用してパイプラインを実行するために、ランナーとその他の必須オプションをプログラムで設定してパイプラインを作成する方法を示しています。
Java
// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()
# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
beam_args,
runner='DataflowRunner',
project='my-project-id',
job_name='unique-job-name',
temp_location='gs://my-bucket/temp',
region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']
# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
pass # build your pipeline here.
Go
Apache Beam SDK for Go では、Go コマンドライン引数が使用されます。フラグの値を設定するには、flag.Set()
を使用します。
// Use the Go flag package to parse custom options.
flag.Parse()
// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")
beam.Init()
// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()
パイプラインを作成した後で、すべてのパイプライン読み取り、変換、書き込みを指定し、パイプラインを実行します。
コマンドラインからパイプライン オプションを使用する
次の例は、コマンドラインで指定されたパイプライン オプションを使用する方法を示しています。この例では、パイプライン オプションをプログラムで設定していません。
Java
// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
コマンドライン オプションを解析するには、Python argparse モジュールを使用します。
# Use Python argparse module to parse custom arguments
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
# For more details on how to use argparse, take a look at:
# https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
'--input-file',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()
# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| 'Read files' >> beam.io.ReadFromText(args.input_file)
| 'Write files' >> beam.io.WriteToText(args.output_path))
Go
Go flag
パッケージを使用して、コマンドライン オプションを解析します。beam.Init()
を呼び出す前にオプションを解析する必要があります。この例では、output
がコマンドライン オプションです。
// Define configuration options
var (
output = flag.String("output", "", "Output file (required).")
)
// Parse options before beam.Init()
flag.Parse()
beam.Init()
// Input validation must be done after beam.Init()
if *output == "" {
log.Fatal("No output provided!")
}
p := beam.NewPipeline()
パイプラインを作成した後で、すべてのパイプライン読み取り、変換、書き込みを指定し、パイプラインを実行します。
実行モードを制御する
Apache Beam プログラムが Dataflow などのサービスでパイプラインを実行する場合、プログラムはパイプラインを非同期で実行するか、パイプラインが完了するまでブロックします。この動作は、次のガイダンスを使用して変更できます。
Java
Apache Beam Java プログラムが Dataflow などのサービスでパイプラインを実行すると、通常は非同期で実行されます。パイプラインを実行し、ジョブが完了するまで待機するには、DataflowRunner
をパイプライン ランナーとして設定し、明示的に pipeline.run().waitUntilFinish()
を呼び出します。
DataflowRunner
を使用し、pipeline.run()
から返された PipelineResult
オブジェクトで waitUntilFinish()
を呼び出すと、パイプラインは Google Cloud で実行されますが、ローカルコードはクラウドジョブが終了して最後の DataflowPipelineJob
オブジェクトが返されるまで待機します。ジョブの実行中、Dataflow サービスは待機しながらジョブ ステータスの更新とコンソール メッセージを出力します。
Python
Apache Beam Python プログラムが Dataflow などのサービスでパイプラインを実行すると、通常は非同期で実行されます。パイプラインの完了までブロックするには、ランナーの run()
メソッドから返される PipelineResult
オブジェクトの wait_until_finish()
メソッドを使用します。
Go
Apache Beam Go プログラムが Dataflow でパイプラインを実行する場合、プログラムはデフォルトで同期を行い、パイプラインが完了するまでブロックします。ブロックを行わない場合は、次の 2 つの方法があります。
Go ルーチンでジョブを開始します。
go func() { pr, err := beamx.Run(ctx, p) if err != nil { // Handle the error. } // Send beam.PipelineResult into a channel. results <- pr }() // Do other operations while the pipeline runs.
jobopts
パッケージにある--async
コマンドライン フラグを使用します。
実行の詳細の表示、進行状況のモニタリング、ジョブ完了ステータスの確認を行うには、Dataflow モニタリング インターフェースまたは Dataflow コマンドライン インターフェースを使用します。
ストリーミング ソースを使用する
Java
無限データソース(Pub/Sub など)からパイプラインで読み取る場合、パイプラインは自動的にストリーミング モードで実行されます。
Python
無限データソース(Pub/Sub など)をパイプラインで使用する場合は、streaming
オプションを true に設定する必要があります。
Go
無限データソース(Pub/Sub など)からパイプラインで読み取る場合、パイプラインは自動的にストリーミング モードで実行されます。
ストリーミング ジョブは、デフォルトで n1-standard-2
以上の Compute Engine マシンタイプを使用します。
ローカルで起動する
パイプラインをマネージド クラウド リソースで実行する代わりに、パイプラインをローカルで実行することを選択できます。ローカル実行には、テスト、デバッグ、または小さなデータセットに対するパイプラインの実行に一定のメリットがあります。たとえば、ローカル実行の場合、リモート Dataflow サービスと関連の Google Cloud プロジェクトとの依存関係が削除されます。
ローカル実行を使用する場合、ローカルメモリに十分に収まる小さなデータセットでパイプラインを実行する必要があります。Create
変換を使用して小さなメモリ内データセットを作成するか、Read
変換を使用して小さなローカル ファイルまたはリモート ファイルを操作できます。通常、ローカル実行の場合、より少ない外部依存関係でテストとデバッグをより速く簡単に実行できますが、ローカル環境で使用可能なメモリによって制限されます。
次のサンプルコードは、ローカル環境で実行するパイプラインの作成方法を示しています。
Java
// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);
Python
import argparse
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()
# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)
with beam.Pipeline(options=beam_options) as pipeline:
lines = (
pipeline
| beam.io.ReadFromText(args.input)
| beam.io.WriteToText(args.output))
Go
// Parse options before beam.Init()
flag.Parse()
beam.Init()
p := beam.NewPipeline()
パイプラインを作成したら、それを実行します。
カスタム パイプライン オプションを作成する
標準の PipelineOptions
に加えて、独自のカスタム オプションを追加できます。Apache Beam のコマンドラインでは、同じ形式で指定されたコマンドライン引数を使用して、カスタム オプションを解析することもできます。
Java
独自のオプションを追加するには、次の例のように、各オプションのゲッター メソッドとセッター メソッドでインターフェースを定義します。
public interface MyOptions extends PipelineOptions {
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
Python
独自のオプションを追加するには、次の例のように add_argument()
メソッドを使用します(これは Python の標準 argparse モジュールとまったく同じ動作をします)。
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument('--input')
parser.add_argument('--output')
Go
独自のオプションを追加するには、次の例に示すように、Go フラグ パッケージを使用します。
var (
input = flag.String("input", "", "")
output = flag.String("output", "", "")
)
ユーザーが --help
をコマンドライン引数として渡したときに表示される説明と、デフォルト値を指定することもできます。
Java
次のように、アノテーションを使用して説明とデフォルト値を設定します。
public interface MyOptions extends PipelineOptions {
@Description("My custom command line argument.")
@Default.String("DEFAULT")
String getMyCustomOption();
void setMyCustomOption(String myCustomOption);
}
PipelineOptionsFactory
でインターフェースを登録してから、PipelineOptions
オブジェクトを作成するときにインターフェースを渡すことをおすすめします。PipelineOptionsFactory
でインターフェースを登録する場合、--help
でカスタム オプション インターフェースを検索し、--help
コマンドの出力に追加できます。PipelineOptionsFactory
は、カスタム オプションが他のすべての登録済みオプションと互換性があることを検証します。
次のサンプルコードは、PipelineOptionsFactory
でカスタム オプション インターフェースを登録する方法を示しています。
PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(MyOptions.class);
これで、パイプラインは --myCustomOption=value
をコマンドライン引数として受け入れることができます。
Python
説明とデフォルト値は次のように設定します。
from apache_beam.options.pipeline_options import PipelineOptions
class MyOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--input',
default='gs://dataflow-samples/shakespeare/kinglear.txt',
help='The file path for the input text to process.')
parser.add_argument(
'--output', required=True, help='The path prefix for output files.')
Go
説明とデフォルト値は次のように設定します。
var (
input = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)