Python を使用して Dataflow パイプラインを作成する

このクイックスタートでは、Apache Beam SDK for Python を使用して、パイプラインを定義するプログラムを作成する方法について説明します。次に、直接ローカル ランナーまたはクラウドベースのランナー(Dataflow など)を使用してパイプラインを実行します。WordCount パイプラインの概要については、Apache Beam で WordCount を使用する方法の動画をご覧ください。


このタスクを Google Cloud コンソールで直接行う際の順を追ったガイダンスについては、[ガイドを表示] をクリックしてください。

ガイドを表示


始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  16. Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • PROJECT_NUMBER は、使用するプロジェクト番号に置き換えます。プロジェクト番号を確認するには、プロジェクトを特定するに記載されている手順を行うか、gcloud projects describe コマンドを使用します。
    • SERVICE_ACCOUNT_ROLE は、個々のロールに置き換えます。
  17. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S(Standard)。
    • ストレージ ロケーションを次のように設定します。 US(米国)。
    • BUCKET_NAME は、 一意のバケット名に置き換えます。バケットの名前空間は世界中の誰でも閲覧可能です。機密情報をバケット名に含めないようにしてください。
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • Google Cloud プロジェクト ID と Cloud Storage バケット名をコピーします。これらの値は、このドキュメントの後半で必要になります。

環境の設定

このセクションでは、コマンド プロンプトを使用して独立した Python 仮想環境を設定し、venv を使用してパイプライン プロジェクトを実行します。このプロセスでは、1 つのプロジェクトの依存関係を他のプロジェクトの依存関係から分離できます。

すぐに利用できるコマンド プロンプトがない場合は、Cloud Shell を使用できます。Cloud Shell には Python 3 用のパッケージ マネージャーがすでにインストールされているので、仮想環境の作成にスキップできます。

Python をインストールして仮想環境を作成する手順は次のとおりです。

  1. システムで Python 3 と pip が実行されていることを確認します。
    python --version
    python -m pip --version
  2. 必要であれば、Python 3 をインストールして Python 仮想環境を設定します。手順については、Python 開発環境の設定ページで「Python のインストール」と「venv の設定」のセクションをご覧ください。 Python 3.10 以降を使用している場合は、Dataflow Runner v2 を有効にする必要もあります。Runner v1 を使用するには、Python 3.9 以前を使用します。

このクイックスタートが完了したら、deactivate を実行して仮想環境を無効にできます。

Apache Beam SDK を入手する

Apache Beam SDK は、データ パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでパイプラインを定義し、パイプラインを実行するランナー(Dataflow など)を選択します。

Apache Beam SDK をダウンロードしてインストールする手順は次のとおりです。

  1. 前のセクションで作成した Python 仮想環境にいることを確認します。プロンプトが <env_name> で始まっていることを確認します。env_name は仮想環境の名前です。
  2. Python wheel パッケージ標準をインストールします。
    pip install wheel
  3. Apache Beam SDK for Python の最新バージョンをインストールします。
  4. pip install 'apache-beam[gcp]'

    Microsoft Windows の場合は、次のコマンドを使用します。

    pip install apache-beam[gcp]

    接続によってはインストールに時間がかかることがあります。

パイプラインをローカルで実行する

ローカルでパイプラインがどのように実行されるかを確認するには、apache_beam パッケージに含まれている wordcount サンプル用に作成されている Python モジュールを使用します。

wordcount パイプラインの例では、次のことを行います。

  1. テキスト ファイルを入力として使用します。

    このテキスト ファイルは、リソース名 gs://dataflow-samples/shakespeare/kinglear.txt で Cloud Storage バケットに配置されます。

  2. 各行を単語に解析します。
  3. トークン化された単語の出現頻度をカウントします。

wordcount パイプラインをローカルでステージングする手順は次のとおりです。

  1. ローカル ターミナルから、wordcount の例を実行します。
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. パイプラインの出力を表示します。
    more outputs*
  3. 終了するには、q キーを押します。
パイプラインをローカルで実行すると、Apache Beam プログラムのテストとデバッグを行うことができます。Apache Beam GitHubwordcount.py ソースコードを表示できます。

Dataflow サービスでパイプラインを実行する

このセクションでは、Dataflow サービスの apache_beam パッケージから wordcount サンプル パイプラインを実行します。この例では、--runner のパラメータとして DataflowRunner を指定しています。
  • パイプラインを実行します。
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    次のように置き換えます。

    • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン(例: europe-west1

      --region フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。

    • BUCKET_NAME: 先ほどコピーした Cloud Storage バケット名
    • PROJECT_ID: 先ほどコピーした Google Cloud プロジェクト ID

結果を表示する

Dataflow を使用してパイプラインを実行すると、結果が Cloud Storage バケットに保存されます。このセクションでは、Google Cloud Console またはローカル ターミナルを使用して、パイプラインの実行状況を確認します。

Google Cloud Console

Google Cloud Console で結果を表示する手順は次のとおりです。

  1. Google Cloud Console で、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

    [ジョブ] ページには wordcount ジョブの詳細が表示されます。最初にステータスが「実行中」のジョブが表示され、その次に「完了」のジョブが表示されます。

  2. Cloud Storage バケットのページに移動します。

    [バケット] に移動

  3. プロジェクト内のバケットのリストで、前に作成したストレージ バケットをクリックします。

    wordcount ディレクトリに、ジョブによって作成された出力ファイルが表示されます。

ローカル ターミナル

ターミナルまたは Cloud Shell を使用して結果を表示します。

  1. 出力ファイルを一覧表示するには、gcloud storage ls コマンドを使用します。
    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
  2. BUCKET_NAME は、パイプライン プログラムで使用する Cloud Storage バケットの名前で置き換えます。

  3. 出力ファイルの結果を表示するには、gcloud storage cat コマンドを使用します。
    gcloud storage cat gs://BUCKET_NAME/results/outputs*

パイプライン コードを変更する

前の例の wordcount パイプラインでは、大文字の単語と小文字の単語が区別されました。次の手順では、wordcount パイプラインで大文字と小文字が区別されないように、パイプラインを変更します。
  1. ローカルマシンで、Apache Beam GitHub リポジトリから最新の wordcount コードをダウンロードします。
  2. ローカル ターミナルから、パイプラインを実行します。
    python wordcount.py --output outputs
  3. 結果を表示します。
    more outputs*
  4. 終了するには、q キーを押します。
  5. 任意のエディタで、wordcount.py ファイルを開きます。
  6. run 関数内で、パイプラインのステップを調べます。
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    split の後、文字列として単語に分割されます。

  7. 文字列を小文字に変換するには、split の後の行を変更します。
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    この変更により、str.lower 関数がすべての単語にマッピングされます。この行は beam.Map(lambda word: str.lower(word)) と同じです。
  8. ファイルを保存して、変更した wordcount ジョブを実行します。
    python wordcount.py --output outputs
  9. 変更されたパイプラインの結果を表示します。
    more outputs*
  10. 終了するには、q キーを押します。
  11. 変更したパイプラインを Dataflow サービスで実行します。
    python wordcount.py \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    次のように置き換えます。

    • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン
    • BUCKET_NAME: Cloud Storage バケット名
    • PROJECT_ID: Google Cloud プロジェクト ID

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、Google Cloud プロジェクトとそのリソースをまとめて削除してください。

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. プロジェクトを残しておく場合は、Compute Engine のデフォルトのサービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  6. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

次のステップ