インタラクティブ ランナーで Apache Beam ノートブックを開発する

JupyterLab ノートブックで Apache Beam インタラクティブ ランナーを使用して、次のタスクを行います。

  • パイプラインを繰り返し開発する。
  • パイプライン グラフを調べる。
  • read-eval-print-loop(REPL)ワークフローで個々の PCollections を解析する。

これらの Apache Beam ノートブックは、Vertex AI Workbench ユーザー管理ノートブックで提供されています。これは、最新のデータ サイエンスと ML フレームワークがプリインストールされたノートブック仮想マシンをホストするマネージド サービスです。Dataflow は、ユーザー管理ノートブック インスタンスのみをサポートします。

このガイドでは、Apache Beam ノートブックを導入することで実現する機能を中心に取り上げますが、ノートブックの作成方法については説明しません。Apache Beam の詳細については、Apache Beam プログラミング ガイドをご覧ください。

サポートと制限事項

  • Apache Beam ノートブックは Python のみをサポートします。
  • これらのノートブックで実行される Apache Beam パイプライン セグメントは、本番環境の Apache Beam ランナーではなく、テスト環境で実行されます。Dataflow サービスでノートブックを起動するには、Apache Beam ノートブックで作成したパイプラインをエクスポートします。詳しくは、ノートブックに作成されたパイプラインから Dataflow ジョブを起動するをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

Apache Beam ノートブック インスタンスを作成する前に、Pub/Sub など、他のサービスを使用するパイプライン用に追加の API を有効にします。

有効にしていない場合、ノートブック インスタンスは、IAM プロジェクト編集者ロールを持つデフォルトの Compute Engine サービス アカウントによって実行されます。プロジェクトがサービス アカウントのロールを明示的に制限している場合は、ノートブックを実行するための十分な権限がプロジェクトに付与されていることを確認してください。たとえば、Pub/Sub トピックから読み取る場合はサブスクリプションを暗黙的に作成するので、サービス アカウントには IAM Pub/Sub 編集者のロールが必要となります。これに対して、Pub/Sub サブスクリプションから読み取る際に必要なのは、IAM Pub/Sub サブスクライバーのロールのみです。

このガイドを完了したら、今後課金が発生しないように、作成したリソースを削除します。詳しくは、クリーンアップをご覧ください。

Apache Beam ノートブック インスタンスを起動する

  1. Google Cloud コンソールで、Dataflow の [Workbench] ページに移動します。

    [ワークベンチ] に移動

  2. [ユーザー管理のノートブック] タブが表示されていることを確認します。

  3. ツールバーで、[ 新規作成] をクリックします。

  4. [環境] セクションの [環境] で、[Apache Beam] を選択します。

  5. 省略可: GPU でノートブックを実行する場合は、[マシンタイプ] セクションで GPU をサポートするマシンタイプを選択し、[NVIDIA GPU ドライバを自動的にインストールする] を選択します。詳細については、GPU プラットフォームをご覧ください。

  6. [ネットワーキング] セクションで、ノートブック VM のサブネットワークを選択します。

  7. 省略可: カスタム ノートブック インスタンスを設定する場合は、特定のプロパティでユーザー管理のノートブック インスタンスを作成するをご覧ください。

  8. [作成] をクリックします。Dataflow Workbench に、新しい Apache Beam ノートブック インスタンスが作成されます。

  9. ノートブック インスタンスが作成されると、[JupyterLab を開く] リンクがアクティブになります。[JupyterLab を開く] をクリックします。

省略可: 依存関係をインストールする

Apache Beam ノートブックには、Apache Beam と Google Cloud コネクタの依存関係がすでにインストールされています。サードパーティのライブラリに依存するカスタム コネクタやカスタム PTransforms がパイプラインに含まれている場合は、ノートブック インスタンスを作成した後にインストールできます。詳細については、ユーザー管理ノートブックのドキュメントに関する依存関係のインストールをご覧ください。

Apache Beam ノートブックの例

ユーザー管理のノートブック インスタンスを作成した後は、JupyterLab で開きます。JupyterLab サイドバーの [Files] タブの Examples フォルダには、いくつかのサンプル ノートブックが含まれています。JupyterLab ファイルの操作の詳細については、JupyterLab ユーザーガイドのファイルの操作をご覧ください。

次のノートブックを使用できます。

  • Word Count
  • Streaming Word Count
  • Streaming NYC Taxi Ride Data
  • Apache Beam SQL in notebooks with comparisons to pipelines
  • Apache Beam SQL in notebooks with the Dataflow Runner
  • ノートブックでの Apache Beam SQL
  • Dataflow Word Count
  • 大規模でインタラクティブな Flink
  • RunInference
  • Apache Beam で GPU を使用する
  • データを可視化する

Tutorials フォルダには、Apache Beam の基礎を説明する追加のチュートリアルが含まれています。次のチュートリアルが利用可能です。

  • 基本オペレーション
  • 要素ごとのオペレーション
  • 集計
  • Windows
  • IO オペレーション
  • ストリーミング
  • 最終演習

これらのノートブックには、Apache Beam のコンセプトと API の使用方法の理解を助ける説明テキストとコメント付きのコードブロックが含まれています。また、チュートリアルでは、学習したコンセプトを実践的に練習できます。

以降のセクションでは、Streaming Word Count ノートブックのサンプルコードを使用します。このガイドのコード スニペットと Streaming Word Count ノートブックのコード スニペットは、若干異なる場合があります。

ノートブック インスタンスの作成

[File] > [New] > [Notebook] に移動し、Apache Beam 2.22 以降のカーネルを選択します。

Apache Beam ノートブックは、Apache Beam SDK のマスター ブランチに対してビルドされます。つまり、ノートブック UI に表示される最新バージョンのカーネルが、最新バージョンの SDK よりも新しい可能性があります。

Apache Beam はノートブック インスタンスにインストールされているため、ノートブックには interactive_runner モジュールと interactive_beam モジュールが含まれます。

import apache_beam as beam
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
import apache_beam.runners.interactive.interactive_beam as ib

ノートブックで他の Google API を使用している場合は、次の import ステートメントを追加します。

from apache_beam.options import pipeline_options
from apache_beam.options.pipeline_options import GoogleCloudOptions
import google.auth

インタラクティブ オプションを設定する

次の行は、InteractiveRunner が無制限のソースからデータを記録するまでの時間を設定しています。この例では、期間は 10 分に設定されています。

ib.options.recording_duration = '10m'

recording_size_limit プロパティを使用して、無制限のソースの記録サイズの上限(バイト単位)を変更することもできます。

# Set the recording size limit to 1 GB.
ib.options.recording_size_limit = 1e9

その他のインタラクティブ オプションについては、interactive_beam.options クラスをご覧ください。

パイプラインを作成する

InteractiveRunner オブジェクトを使用して、パイプラインを初期化します。

options = pipeline_options.PipelineOptions(flags={})

# Set the pipeline mode to stream the data from Pub/Sub.
options.view_as(pipeline_options.StandardOptions).streaming = True

# Set the project to the default project in your current Google Cloud environment.
# The project is used to create a subscription to the Pub/Sub topic.
_, options.view_as(GoogleCloudOptions).project = google.auth.default()

p = beam.Pipeline(InteractiveRunner(), options=options)

データの読み取りと可視化

次の例は、指定された Pub/Sub トピックにサブスクリプションを作成し、そのサブスクリプションから読み取りを行う Apache Beam パイプラインを示しています。

words = p | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")

パイプラインは、ソースからウィンドウごとに単語をカウントします。固定されたウィンドウを作成し、各ウィンドウの長さを 10 秒に設定します。

windowed_words = (words
   | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))

データがウィンドウ処理されると、ウィンドウごとに単語をカウントします。

windowed_word_counts = (windowed_words
   | "count" >> beam.combiners.Count.PerElement())

show() メソッドは、作成された PCollection をノートブックに可視化します。

ib.show(windowed_word_counts, include_window_info=True)

PCollection をテーブル形式で可視化する show メソッド。

nduration の 2 つのオプション パラメータを設定すると、show() から結果セットを絞り込むことができます。

  • n を設定して、結果セットが最大で n の要素数(20 など)を表示するように制限します。n が設定されていない場合、デフォルトの動作として、ソースの記録が終了するまで最新の要素が一覧表示されます。
  • duration を設定して、結果セットをソース記録の開始から指定した秒数分のデータに制限します。duration が設定されていない場合、デフォルトの動作として、記録が終了するまですべての要素が一覧表示されます。

両方のパラメータを設定した場合、いずれかがしきい値に達すると show() が停止します。次の例では、show() は記録されたソースからの最初の 30 秒に相当するデータに基づいて、計算された最大 20 個の要素を返します。

ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)

データの可視化を表示するには、visualize_data=Trueshow() メソッドに渡します。可視化には複数のフィルタを適用できます。次の可視化では、ラベルと軸でフィルタリングできます。

フィルタできる多様な UI 要素のセットとして PCollection を可視化する show メソッド。

ストリーミング パイプラインのプロトタイピングと同時に再現性を確保するため、show() メソッド呼び出しはデフォルトでキャプチャしたデータを再利用します。この動作を変更し、show() メソッドが常に新しいデータを取得するようにするには、interactive_beam.options.enable_capture_replay = False を設定します。また、2 つ目の無限ソースをノートブックに追加すると、元の無限ソースのデータは破棄されます。

Apache Beam ノートブックの可視化では、Pandas DataFrame も役立ちます。次の例では、最初に単語を小文字に変換してから、各単語の頻度を計算します。

windowed_lower_word_counts = (windowed_words
   | beam.Map(lambda word: word.lower())
   | "count" >> beam.combiners.Count.PerElement())

collect() メソッドにより、Pandas DataFrame の出力を取得できます。

ib.collect(windowed_lower_word_counts, include_window_info=True)

Pandas DataFrame 内の PCollection を表す collect メソッド。

ノートブックの開発では、セルの編集と再実行が一般的です。Apache Beam ノートブックでセルを編集して再実行した場合、元のセルでのコード処理は元に戻りません。たとえば、セルがパイプラインに PTransform を追加した場合、そのセルを再実行すると、さらに PTransform がパイプラインに追加されます。状態をクリアするには、カーネルを再起動してセルを再実行します。

Interactive Beam インスペクタでデータを可視化する

show()collect() を絶え間なく呼び出すことで、PCollection のデータの取り込みを邪魔してしまう場合があります。特に出力が画面スペースの多くを占有し、ノートブックの操作が困難になることがあります。また、複数の PCollections を並べて比較し、変換が意図したとおりに機能するかどうか検証することもできます。たとえば、一方の PCollection が変換を行い、もう一方の変換が生成される場合などです。これらのユースケースでは、Interactive Beam インスペクタを使用すると便利です。

Interactive Beam インスペクタは、Apache Beam ノートブックにプリインストールされている JupyterLab 拡張機能 apache-beam-jupyterlab-sidepanel として提供されています。この拡張機能を使用すると、show()collect() を明示的に呼び出すことなく、各 PCollection に関連付けられたパイプラインとデータの状態をインタラクティブに検査できます。

インスペクタを開く方法は 3 つあります。

  • JupyterLab の上部メニューバーで [Interactive Beam] をクリックします。プルダウンで [Open Inspector] をクリックしてインスペクタを開きます。

    メニューからインスペクタを開く

  • ランチャー ページを使用します。ランチャー ページが開いていない場合は、[File] -> [New Launcher] の順にクリックして開きます。ランチャー ページで Interactive Beam を探して [Open Inspector] をクリックし、インスペクタを開きます。

    ランチャーからインスペクタを開く

  • コマンド パレットを使用します。JupyterLab のメニューバーで、[View] > [Activate Command Palette] をクリックします。ダイアログで Interactive Beam を検索して、拡張機能のすべてのオプションを一覧表示します。[Open Inspector] をクリックしてインスペクタを開きます。

    コマンド パレットで Inspector を開く

インスペクタが開くときに:

  • 開いているノートブックが 1 つだけの場合、インスペクタは自動的にそのノートブックに接続します。

  • ノートブックが開いていない場合は、カーネルを選択するためのダイアログが表示されます。

  • 複数のノートブックが開いている場合は、ノートブック セッションを選択するためのダイアログが表示されます。

    接続するノートブックの選択

インスペクタを開く前に、少なくとも 1 つのノートブックを開いて、そのカーネルを選択することをおすすめします。ノートブックを開く前にカーネルでインスペクタを開く場合は、後でノートブックを開いてインスペクタに接続するときに、Use Kernel from Preferred Session から Interactive Beam Inspector Session を選択する必要があります。同じカーネルから作成された異なるセッションではなく、同じセッションを共有している場合は、インスペクタとノートブックが接続されます。Start Preferred Kernel から同じカーネルを選択すると、開いているノートブックまたはインスペクタの既存のセッションから独立した新しいセッションが作成されます。

開いたノートブックに複数のインスペクタを開き、ワークスペースでタブを自由にドラッグ&ドロップすることでインスペクタを並べ替えることができます。

2 つのインスペクタを開き、並べて配置

ノートブックでセルを実行すると、インスペクタ ページが自動的に更新されます。このページには、接続されたノートブックで定義されているパイプラインと PCollections が一覧表示されます。PCollections は、それが属するパイプラインごとに整理されています。ヘッダー パイプラインをクリックすると、折りたたむことができます。

パイプラインと PCollections リスト内のアイテムをクリックすると、対応する可視化データがインスペクタの右側にレンダリングされます。

  • PCollection の場合、インスペクタは、追加のウィジェットを使用してデータをレンダリングします(データがまだ制限なしの PCollections に送信されている場合は動的にレンダリングします)。[APPLY] ボタンをクリックすると、可視化が調整されます。

    インスペクタ ページ

    インスペクタと開いているノートブックは同じカーネル セッションを共有するため、互いの実行をブロックします。たとえば、ノートブックでコードの実行がビジー状態の場合、ノートブックがその実行を完了するまでインスペクタは更新されません。逆に、インスペクタが PCollection を動的に可視化しているときに、ノートブックでなんらかのコードをすぐに実行する場合は、[STOP] ボタンをクリックして可視化を停止します。これにより、処理が完了する前に、カーネルをノートブックに解放できます。

  • パイプラインである場合は、インスペクタにパイプラインのグラフが表示されます。

    インスペクタ ページ

匿名パイプラインが表示される場合があります。これらのパイプラインには PCollections があります。アクセスはできますが、メイン セッションからは参照されなくなります。次に例を示します。

p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])

p = beam.Pipeline()

前の例では、空のパイプライン p と、1 つの PCollection pcoll を含む匿名パイプラインを作成しています。pcoll.pipeline を使用すると、匿名パイプラインにアクセスできます。

パイプラインと PCollection リストを切り替えることで、大規模な可視化に必要なスペースを節約できます。左側リストの切り替え

パイプラインの記録ステータスについて

可視化だけでなく、describe を呼び出すことで、ノートブック インスタンス内の 1 つまたはすべてのパイプラインの記録ステータスを調べることもできます。

# Return the recording status of a specific pipeline. Leave the parameter list empty to return
# the recording status of all pipelines.
ib.recordings.describe(p)

describe() メソッドは、次の詳細情報を提供します。

  • ディスク上のパイプラインのすべての記録の合計サイズ(バイト単位)
  • バックグラウンドの記録ジョブの開始時間(Unix エポックからの秒数)
  • バックグラウンドの記録ジョブの現在のパイプライン ステータス
  • パイプラインの Python 変数

ノートブックに作成されたパイプラインから Dataflow ジョブを起動する

  1. 省略可: ノートブックを使用して Dataflow ジョブを実行する前に、カーネルを再起動し、すべてのセルを再実行して出力を確認します。この手順を省略すると、ノートブックの隠れた状態がパイプライン オブジェクトのジョブグラフに影響を及ぼす可能性があります。
  2. Dataflow API を有効にします
  3. 次の import ステートメントを追加します。

    from apache_beam.runners import DataflowRunner
    
  4. パイプライン オプションを渡します。

    # Set up Apache Beam pipeline options.
    options = pipeline_options.PipelineOptions()
    
    # Set the project to the default project in your current Google Cloud
    # environment.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    # Set the Google Cloud region to run Dataflow.
    options.view_as(GoogleCloudOptions).region = 'us-central1'
    
    # Choose a Cloud Storage location.
    dataflow_gcs_location = 'gs://<change me>/dataflow'
    
    # Set the staging location. This location is used to stage the
    # Dataflow pipeline and SDK binary.
    options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
    
    # Set the temporary location. This location is used to store temporary files
    # or intermediate results before outputting to the sink.
    options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
    
    # If and only if you are using Apache Beam SDK built from source code, set
    # the SDK location. This is used by Dataflow to locate the SDK
    # needed to run the pipeline.
    options.view_as(pipeline_options.SetupOptions).sdk_location = (
        '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
        beam.version.__version__)
    

    パラメータ値は調整できます。たとえば、region の値を us-central1 から変更できます。

  5. DataflowRunner を使用してパイプラインを実行します。この手順により、Dataflow サービスでジョブが実行されます。

    runner = DataflowRunner()
    runner.run_pipeline(p, options=options)
    

    p は、パイプラインの作成のパイプライン オブジェクトです。

インタラクティブ ノートブックでこの変換を実行する方法の例については、ノートブック インスタンスの Dataflow Word Count ノートブックを参照してください。

あるいは、ノートブックを実行可能スクリプトとしてエクスポートし、生成された .py ファイルを前の手順で変更してから、Dataflow サービスにパイプラインをデプロイすることもできます。

ノートブックを保存する

作成したノートブックは、実行中のノートブック インスタンスにローカルに保存されます。開発中にノートブック インスタンスをリセットまたはシャットダウンした場合、これらの新しいノートブックは、/home/jupyter ディレクトリの下に作成される限り保持されます。ただし、ノートブック インスタンスが削除されると、それらのノートブックも削除されます。

ノートブックを後で使用できるようにするには、ワークステーションにローカルにダウンロードするか、GitHub に保存するか、別のファイル形式にエクスポートします。

ノートブックを追加の永続ディスクに保存する

さまざまなノートブック インスタンスでノートブックやスクリプトなどの処理を保持する場合は、Persistent Disk に保存します。

  1. Persistent Disk を作成するか、アタッチします。手順に沿って ssh を使用してノートブック インスタンスの VM に接続し、開いている Cloud Shell でコマンドを発行します。

  2. Persistent Disk がマウントされているディレクトリ(/mnt/myDisk など)をメモします。

  3. ノートブック インスタンスの VM の詳細を編集し、Custom metadata に次のエントリを追加します: キー - container-custom-params、値 - -v /mnt/myDisk:/mnt/myDiskマウントされた PD をバインドするために必要な追加のメタデータ

  4. [保存] をクリックします。

  5. これらの変更を更新するには、ノートブック インスタンスをリセットします。ノートブック インスタンスをリセットする

  6. リセット後、[JupyterLab を開く] をクリックします。JupyterLab UI が利用可能になるまでに時間がかかることがあります。UI が表示されたら、ターミナルを開き、コマンド ls -al /mnt を実行します。/mnt/myDisk ディレクトリが一覧表示されます。ボリュームのバインドの一覧表示

これで、作業を /mnt/myDisk ディレクトリに保存できるようになりました。ノートブック インスタンスが削除されても、Persistent Disk はプロジェクト内に存在します。この Persistent Disk は、他のノートブック インスタンスにアタッチできます。

クリーンアップ

Apache Beam ノートブック インスタンスの使用が終了したら、ノートブック インスタンスをシャットダウンして、Google Cloud で作成したリソースをクリーンアップします。

次のステップ