ジョブビルダーを使用してカスタムジョブを作成する

ジョブビルダーを使用すると、カスタム バッチジョブとストリーミング Dataflow ジョブを作成できます。ジョブビルダー ジョブを Apache Beam YAML ファイルとして保存して、共有と再利用を行うこともできます。

新しいパイプラインを作成する

ジョブビルダーで新しいパイプラインを作成する手順は次のとおりです。

  1. Google Cloud コンソールの [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. [ビルダーからジョブを作成] をクリックします。

  3. [ジョブ名] に、ジョブの名前を入力します。

  4. [バッチ] または [ストリーミング] を選択します。

  5. [ストリーミング] を選択した場合は、ウィンドウ処理モードを選択します。次のように、ウィンドウの仕様を入力します。

    • 固定ウィンドウ: ウィンドウ サイズ(秒単位)を入力します。
    • スライディング ウィンドウ: ウィンドウ サイズとウィンドウ期間(秒単位)を入力します。
    • セッション ウィンドウ: セッションのギャップ(秒単位)を入力します。

    ウィンドウ処理の詳細については、ウィンドウとウィンドウ処理関数をご覧ください。

次に、以下の各セクションで説明するように、ソース、変換、シンクをパイプラインに追加します。

パイプラインにソースを追加する

パイプラインには少なくとも 1 つのソースが必要です。ジョブビルダーにはもともと、空のソースが 1 つ含まれています。ソースを構成する手順は次のとおりです。

  1. [ソース名] ボックスにソースの名前を入力するか、デフォルトの名前を使用します。ジョブを実行すると、この名前がジョブグラフに表示されます。

  2. [ソースタイプ] リストで、データソースのタイプを選択します。

  3. ソースタイプに応じて、追加の構成情報を指定します。たとえば、BigQuery を選択した場合は、読み取るテーブルを指定します。

    Pub/Sub を選択した場合は、メッセージ スキーマを指定します。Pub/Sub メッセージから読み取る各フィールドの名前とデータ型を入力します。パイプラインは、スキーマで指定されていないフィールドを削除します。

  4. 省略可: 一部のソースタイプでは、[ソースデータをプレビュー] をクリックして、ソースデータのプレビューを表示できます。

パイプラインに別のソースを追加するには、[ソースを追加] をクリックします。複数のソースのデータを結合するには、SQL または Join 変換をパイプラインに追加します。

パイプラインに変換を追加する

必要に応じて、パイプラインに 1 つ以上の変換を追加します。次の変換を使用して、ソースと他の変換からのデータの操作、集計、結合を行うことができます。

変換タイプ 説明 Beam YAML 変換情報
フィルタ(Python) Python 式を使用してレコードをフィルタします。
SQL 変換 SQL ステートメントを使用してレコードを操作したり、複数の入力を結合したりします。
参加 同じフィールドで複数の入力を結合します。
フィールドのマッピング(Python) Python の式と関数を使用して、新しいフィールドを追加するか、レコード全体を再マッピングします。
フィールドのマッピング(SQL) SQL 式を使用してレコード フィールドを追加またはマッピングします。
グループ条件 レコードを count()sum() などの関数と結合します。
YAML 変換:
  1. AssertEqual
  2. AssignTimestamps
  3. Combine
  4. 展開
  5. フィルタ
  6. Flatten
  7. 参加
  8. LogForTesting
  9. MLTransform
  10. MapToFields
  11. PyTransform
  12. WindowInfo

Beam YAML SDK の変換を使用します。

YAML 変換の構成: YAML 変換の構成パラメータを YAML マップとして指定します。Key-Value ペアは、生成される Beam YAML 変換の構成セクションにデータを入力するために使用されます。各変換タイプでサポートされている構成パラメータについては、Beam YAML 変換のドキュメントをご覧ください。構成パラメータの例:

Combine
group_by:
combine:
参加
type:
equalities:
fields:
Explode 配列フィールドをフラット化してレコードを分割します。

変換を追加する手順は次のとおりです。

  1. [変換を追加] をクリックします。

  2. [変換] 名のボックスに、変換の名前を入力するか、デフォルトの名前を使用します。ジョブを実行すると、この名前がジョブグラフに表示されます。

  3. [変換タイプ] リストで、変換のタイプを選択します。

  4. 変換タイプに応じて、追加の構成情報を指定します。たとえば、[フィルタ(Python)] を選択した場合は、フィルタとして使用する Python 式を入力します。

  5. 変換の入力ステップを選択します。入力ステップとは、この変換の入力データとなる出力を提供する、ソースまたは変換のことです。

パイプラインにシンクを追加する

パイプラインには少なくとも 1 つのシンクが必要です。ジョブビルダーにはもともと、空のシンクが 1 つ含まれています。シンクを構成する手順は次のとおりです。

  1. [シンク名] ボックスにシンクの名前を入力するか、デフォルトの名前を使用します。ジョブを実行すると、この名前がジョブグラフに表示されます。

  2. [シンクの種類] リストで、シンクの種類を選択します。

  3. シンクの種類に応じて、追加の構成情報を指定します。たとえば、BigQuery シンクを選択した場合は、書き込み先の BigQuery テーブルを選択します。

  4. シンクの入力ステップを選択します。入力ステップとは、この変換の入力データとなる出力を提供する、ソースまたは変換のことです。

  5. パイプラインにさらにシンクを追加するには、[シンクを追加] をクリックします。

パイプラインを実行する

ジョブビルダーからパイプラインを実行する手順は次のとおりです。

  1. 省略可: Dataflow ジョブ オプションを設定します。[Dataflow オプション] セクションを開くには、展開矢印 をクリックします。

  2. [ジョブを実行] をクリックします。ジョブビルダーは、送信されたジョブのジョブグラフに移動します。ジョブグラフを使用して、ジョブのステータスをモニタリングできます。

起動する前にパイプラインを検証する

Python フィルタや SQL 式など、構成が複雑なパイプラインの場合は、起動する前にパイプラインの構成に構文エラーがないか確認することをおすすめします。パイプラインの構文を検証するには、次の操作を行います。

  1. [検証] をクリックして Cloud Shell を開き、検証サービスを開始します。
  2. [検証を開始] をクリックします。
  3. 検証中にエラーが見つかった場合は、赤色の感嘆符が表示されます。
  4. 検出されたエラーを修正し、[検証] をクリックして修正を確認します。エラーが検出されなかった場合は、緑色のチェックマークが表示されます。

gcloud CLI で実行する

gcloud CLI を使用して Beam YAML パイプラインを実行することもできます。gcloud CLI で Job Builder パイプラインを実行するには:

  1. [YAML の保存] をクリックして、[YAML の保存] ウィンドウを開きます。

  2. 次のいずれかの操作を行います。

    • Cloud Storage に保存するには、Cloud Storage パスを入力して [保存] をクリックします。
    • ローカル ファイルをダウンロードするには、[ダウンロード] をクリックします。
  3. シェルまたはターミナルで次のコマンドを実行します。

      gcloud dataflow yaml run my-job-builder-job --yaml-pipeline-file=YAML_FILE_PATH
    

    YAML_FILE_PATH は、ローカルまたは Cloud Storage 内の YAML ファイルのパスに置き換えます。

次のステップ