ジョブビルダーを使用すると、カスタム バッチジョブとストリーミング Dataflow ジョブを作成できます。ジョブビルダー ジョブを Apache Beam YAML ファイルとして保存して、共有と再利用を行うこともできます。
新しいパイプラインを作成する
ジョブビルダーで新しいパイプラインを作成する手順は次のとおりです。
Google Cloud コンソールの [ジョブ] ページに移動します。
[
ビルダーからジョブを作成] をクリックします。[ジョブ名] に、ジョブの名前を入力します。
[バッチ] または [ストリーミング] を選択します。
[ストリーミング] を選択した場合は、ウィンドウ処理モードを選択します。次のように、ウィンドウの仕様を入力します。
- 固定ウィンドウ: ウィンドウ サイズ(秒単位)を入力します。
- スライディング ウィンドウ: ウィンドウ サイズとウィンドウ期間(秒単位)を入力します。
- セッション ウィンドウ: セッションのギャップ(秒単位)を入力します。
ウィンドウ処理の詳細については、ウィンドウとウィンドウ処理関数をご覧ください。
次に、以下の各セクションで説明するように、ソース、変換、シンクをパイプラインに追加します。
パイプラインにソースを追加する
パイプラインには少なくとも 1 つのソースが必要です。ジョブビルダーにはもともと、空のソースが 1 つ含まれています。ソースを構成する手順は次のとおりです。
[ソース名] ボックスにソースの名前を入力するか、デフォルトの名前を使用します。ジョブを実行すると、この名前がジョブグラフに表示されます。
[ソースタイプ] リストで、データソースのタイプを選択します。
ソースタイプに応じて、追加の構成情報を指定します。たとえば、BigQuery を選択した場合は、読み取るテーブルを指定します。
Pub/Sub を選択した場合は、メッセージ スキーマを指定します。Pub/Sub メッセージから読み取る各フィールドの名前とデータ型を入力します。パイプラインは、スキーマで指定されていないフィールドを削除します。
省略可: 一部のソースタイプでは、[ソースデータをプレビュー] をクリックして、ソースデータのプレビューを表示できます。
パイプラインに別のソースを追加するには、[ソースを追加] をクリックします。複数のソースのデータを結合するには、SQL
または Join
変換をパイプラインに追加します。
パイプラインに変換を追加する
必要に応じて、パイプラインに 1 つ以上の変換を追加します。次の変換を使用して、ソースと他の変換からのデータの操作、集計、結合を行うことができます。
変換タイプ | 説明 | Beam YAML 変換情報 |
---|---|---|
フィルタ(Python) | Python 式を使用してレコードをフィルタします。 | |
SQL 変換 | SQL ステートメントを使用してレコードを操作したり、複数の入力を結合したりします。 | |
参加 | 同じフィールドで複数の入力を結合します。 | |
フィールドのマッピング(Python) | Python の式と関数を使用して、新しいフィールドを追加するか、レコード全体を再マッピングします。 | |
フィールドのマッピング(SQL) | SQL 式を使用してレコード フィールドを追加またはマッピングします。 | |
グループ条件 |
レコードを count() や sum() などの関数と結合します。 |
|
YAML 変換:
|
Beam YAML SDK の変換を使用します。 YAML 変換の構成: YAML 変換の構成パラメータを YAML マップとして指定します。Key-Value ペアは、生成される Beam YAML 変換の構成セクションにデータを入力するために使用されます。各変換タイプでサポートされている構成パラメータについては、Beam YAML 変換のドキュメントをご覧ください。構成パラメータの例: Combinegroup_by: combine: 参加type: equalities: fields: |
|
Explode | 配列フィールドをフラット化してレコードを分割します。 |
変換を追加する手順は次のとおりです。
[変換を追加] をクリックします。
[変換] 名のボックスに、変換の名前を入力するか、デフォルトの名前を使用します。ジョブを実行すると、この名前がジョブグラフに表示されます。
[変換タイプ] リストで、変換のタイプを選択します。
変換タイプに応じて、追加の構成情報を指定します。たとえば、[フィルタ(Python)] を選択した場合は、フィルタとして使用する Python 式を入力します。
変換の入力ステップを選択します。入力ステップとは、この変換の入力データとなる出力を提供する、ソースまたは変換のことです。
パイプラインにシンクを追加する
パイプラインには少なくとも 1 つのシンクが必要です。ジョブビルダーにはもともと、空のシンクが 1 つ含まれています。シンクを構成する手順は次のとおりです。
[シンク名] ボックスにシンクの名前を入力するか、デフォルトの名前を使用します。ジョブを実行すると、この名前がジョブグラフに表示されます。
[シンクの種類] リストで、シンクの種類を選択します。
シンクの種類に応じて、追加の構成情報を指定します。たとえば、BigQuery シンクを選択した場合は、書き込み先の BigQuery テーブルを選択します。
シンクの入力ステップを選択します。入力ステップとは、この変換の入力データとなる出力を提供する、ソースまたは変換のことです。
パイプラインにさらにシンクを追加するには、[シンクを追加] をクリックします。
パイプラインを実行する
ジョブビルダーからパイプラインを実行する手順は次のとおりです。
省略可: Dataflow ジョブ オプションを設定します。[Dataflow オプション] セクションを開くには、展開矢印
をクリックします。[ジョブを実行] をクリックします。ジョブビルダーは、送信されたジョブのジョブグラフに移動します。ジョブグラフを使用して、ジョブのステータスをモニタリングできます。
起動する前にパイプラインを検証する
Python フィルタや SQL 式など、構成が複雑なパイプラインの場合は、起動する前にパイプラインの構成に構文エラーがないか確認することをおすすめします。パイプラインの構文を検証するには、次の操作を行います。
- [検証] をクリックして Cloud Shell を開き、検証サービスを開始します。
- [検証を開始] をクリックします。
- 検証中にエラーが見つかった場合は、赤色の感嘆符が表示されます。
- 検出されたエラーを修正し、[検証] をクリックして修正を確認します。エラーが検出されなかった場合は、緑色のチェックマークが表示されます。
gcloud CLI で実行する
gcloud CLI を使用して Beam YAML パイプラインを実行することもできます。gcloud CLI で Job Builder パイプラインを実行するには:
[YAML の保存] をクリックして、[YAML の保存] ウィンドウを開きます。
次のいずれかの操作を行います。
- Cloud Storage に保存するには、Cloud Storage パスを入力して [保存] をクリックします。
- ローカル ファイルをダウンロードするには、[ダウンロード] をクリックします。
シェルまたはターミナルで次のコマンドを実行します。
gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
YAML_FILE_PATH
は、ローカルまたは Cloud Storage 内の YAML ファイルのパスに置き換えます。
次のステップ
- Dataflow ジョブ モニタリング インターフェースを使用する。
- ジョブビルダーで YAML ジョブ定義を保存して読み込む。
- Beam YAML の詳細を確認する。