Python を使用して Dataflow パイプラインを作成する
このクイックスタートでは、Apache Beam SDK for Python を使用して、パイプラインを定義するプログラムを作成する方法について説明します。次に、直接ローカル ランナーまたはクラウドベースのランナー(Dataflow など)を使用してパイプラインを実行します。WordCount パイプラインの概要については、Apache Beam で WordCount を使用する方法の動画をご覧ください。
このタスクを Google Cloud コンソールで直接行う際の順を追ったガイダンスについては、[ガイドを表示] をクリックしてください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow
compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com -
Create local authentication credentials for your user account:
gcloud auth application-default login
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
PROJECT_ID
は、実際のプロジェクト ID に置き換えます。PROJECT_NUMBER
は、使用するプロジェクト番号に置き換えます。プロジェクト番号を確認するには、プロジェクトを特定するに記載されている手順を行うか、gcloud projects describe
コマンドを使用します。SERVICE_ACCOUNT_ROLE
は、個々のロールに置き換えます。
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standard)。 -
ストレージ ロケーションを次のように設定します。
US
(米国)。 -
BUCKET_NAME
は、 一意のバケット名に置き換えます。バケットの名前空間は世界中の誰でも閲覧可能です。機密情報をバケット名に含めないようにしてください。 - Google Cloud プロジェクト ID と Cloud Storage バケット名をコピーします。これらの値は、このドキュメントの後半で必要になります。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Set the storage class to
環境の設定
このセクションでは、コマンド プロンプトを使用して独立した Python 仮想環境を設定し、venv を使用してパイプライン プロジェクトを実行します。このプロセスでは、1 つのプロジェクトの依存関係を他のプロジェクトの依存関係から分離できます。
すぐに利用できるコマンド プロンプトがない場合は、Cloud Shell を使用できます。Cloud Shell には Python 3 用のパッケージ マネージャーがすでにインストールされているので、仮想環境の作成にスキップできます。
Python をインストールして仮想環境を作成する手順は次のとおりです。
- システムで Python 3 と
pip
が実行されていることを確認します。python --version python -m pip --version
- 必要であれば、Python 3 をインストールして Python 仮想環境を設定します。手順については、Python 開発環境の設定ページで「Python のインストール」と「venv の設定」のセクションをご覧ください。 Python 3.10 以降を使用している場合は、Dataflow Runner v2 を有効にする必要もあります。Runner v1 を使用するには、Python 3.9 以前を使用します。
このクイックスタートが完了したら、deactivate
を実行して仮想環境を無効にできます。
Apache Beam SDK を入手する
Apache Beam SDK は、データ パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでパイプラインを定義し、パイプラインを実行するランナー(Dataflow など)を選択します。
Apache Beam SDK をダウンロードしてインストールする手順は次のとおりです。
- 前のセクションで作成した Python 仮想環境にいることを確認します。プロンプトが
<env_name>
で始まっていることを確認します。env_name
は仮想環境の名前です。 - Python wheel パッケージ標準をインストールします。
pip install wheel
- Apache Beam SDK for Python の最新バージョンをインストールします。
pip install 'apache-beam[gcp]'
Microsoft Windows の場合は、次のコマンドを使用します。
pip install apache-beam[gcp]
接続によってはインストールに時間がかかることがあります。
パイプラインをローカルで実行する
ローカルでパイプラインがどのように実行されるかを確認するには、apache_beam
パッケージに含まれている wordcount
サンプル用に作成されている Python モジュールを使用します。
wordcount
パイプラインの例では、次のことを行います。
テキスト ファイルを入力として使用します。
このテキスト ファイルは、リソース名
gs://dataflow-samples/shakespeare/kinglear.txt
で Cloud Storage バケットに配置されます。- 各行を単語に解析します。
- トークン化された単語の出現頻度をカウントします。
wordcount
パイプラインをローカルでステージングする手順は次のとおりです。
- ローカル ターミナルから、
wordcount
の例を実行します。python -m apache_beam.examples.wordcount \ --output outputs
- パイプラインの出力を表示します。
more outputs*
- 終了するには、q キーを押します。
wordcount.py
ソースコードを表示できます。Dataflow サービスでパイプラインを実行する
このセクションでは、Dataflow サービスのapache_beam
パッケージから wordcount
サンプル パイプラインを実行します。この例では、--runner
のパラメータとして DataflowRunner
を指定しています。- パイプラインを実行します。
python -m apache_beam.examples.wordcount \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
次のように置き換えます。
DATAFLOW_REGION
: Dataflow ジョブをデプロイするリージョン(例:europe-west1
)--region
フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。BUCKET_NAME
: 先ほどコピーした Cloud Storage バケット名PROJECT_ID
: 先ほどコピーした Google Cloud プロジェクト ID
結果を表示する
Dataflow を使用してパイプラインを実行すると、結果が Cloud Storage バケットに保存されます。このセクションでは、Google Cloud Console またはローカル ターミナルを使用して、パイプラインの実行状況を確認します。
Google Cloud Console
Google Cloud Console で結果を表示する手順は次のとおりです。
- Google Cloud Console で、Dataflow の [ジョブ] ページに移動します。
[ジョブ] ページには
wordcount
ジョブの詳細が表示されます。最初にステータスが「実行中」のジョブが表示され、その次に「完了」のジョブが表示されます。 - Cloud Storage バケットのページに移動します。
プロジェクト内のバケットのリストで、前に作成したストレージ バケットをクリックします。
wordcount
ディレクトリに、ジョブによって作成された出力ファイルが表示されます。
ローカル ターミナル
ターミナルまたは Cloud Shell を使用して結果を表示します。
- 出力ファイルを一覧表示するには、
gcloud storage ls
コマンドを使用します。gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
- 出力ファイルの結果を表示するには、
gcloud storage cat
コマンドを使用します。gcloud storage cat gs://BUCKET_NAME/results/outputs*
BUCKET_NAME
は、パイプライン プログラムで使用する Cloud Storage バケットの名前で置き換えます。
パイプライン コードを変更する
前の例のwordcount
パイプラインでは、大文字の単語と小文字の単語が区別されました。次の手順では、wordcount
パイプラインで大文字と小文字が区別されないように、パイプラインを変更します。- ローカルマシンで、Apache Beam GitHub リポジトリから最新の
wordcount
コードをダウンロードします。 - ローカル ターミナルから、パイプラインを実行します。
python wordcount.py --output outputs
- 結果を表示します。
more outputs*
- 終了するには、q キーを押します。
- 任意のエディタで、
wordcount.py
ファイルを開きます。 run
関数内で、パイプラインのステップを調べます。counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
split
の後、文字列として単語に分割されます。- 文字列を小文字に変換するには、
split
の後の行を変更します。 この変更により、counts = ( lines | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str)) | 'lowercase' >> beam.Map(str.lower) | 'PairWithOne' >> beam.Map(lambda x: (x, 1)) | 'GroupAndSum' >> beam.CombinePerKey(sum))
str.lower
関数がすべての単語にマッピングされます。この行はbeam.Map(lambda word: str.lower(word))
と同じです。 - ファイルを保存して、変更した
wordcount
ジョブを実行します。python wordcount.py --output outputs
- 変更されたパイプラインの結果を表示します。
more outputs*
- 終了するには、q キーを押します。
- 変更したパイプラインを Dataflow サービスで実行します。
python wordcount.py \ --region DATAFLOW_REGION \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output gs://BUCKET_NAME/results/outputs \ --runner DataflowRunner \ --project PROJECT_ID \ --temp_location gs://BUCKET_NAME/tmp/
次のように置き換えます。
DATAFLOW_REGION
: Dataflow ジョブをデプロイするリージョンBUCKET_NAME
: Cloud Storage バケット名PROJECT_ID
: Google Cloud プロジェクト ID
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、Google Cloud プロジェクトとそのリソースをまとめて削除してください。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
プロジェクトを残しておく場合は、Compute Engine のデフォルトのサービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke