Java を使用して Dataflow パイプラインを作成する
このドキュメントでは、 Google Cloud プロジェクトの設定、Apache Beam SDK for Java で構築されたサンプル パイプラインの作成、Dataflow サービスでのサンプル パイプラインの実行方法について説明します。このパイプラインは、Cloud Storage からテキスト ファイルを読み取り、ファイル内の一意の単語数をカウントし、単語数を Cloud Storage に書き込みます。WordCount パイプラインの概要については、Apache Beam で WordCount を使用する方法の動画をご覧ください。
このチュートリアルでは Maven が必要ですが、サンプル プロジェクトを Maven から Gradle に変換することもできます。詳細については、省略可: Maven から Gradle に変換するをご覧ください。
このタスクを Google Cloud コンソールで直接行う際の順を追ったガイダンスについては、[ガイドを表示] をクリックしてください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow<wbr> compute_component<wbr> logging<wbr> storage_component<wbr> storage_api<wbr> bigquery<wbr> pubsub<wbr> datastore.googleapis.com<wbr> cloudresourcemanager.googleapis.com
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:
gcloud services enable dataflow<wbr> compute_component<wbr> logging<wbr> storage_component<wbr> storage_api<wbr> bigquery<wbr> pubsub<wbr> datastore.googleapis.com<wbr> cloudresourcemanager.googleapis.com
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
PROJECT_ID
は、実際のプロジェクト ID に置き換えます。PROJECT_NUMBER
は、使用するプロジェクト番号に置き換えます。プロジェクト番号を確認するには、プロジェクトを特定するに記載されている手順を行うか、gcloud projects describe
コマンドを使用します。SERVICE_ACCOUNT_ROLE
は、個々のロールに置き換えます。
-
Create a Cloud Storage bucket and configure it as follows:
-
Set the storage class to
S
(Standard)。 -
ストレージ ロケーションを次のように設定します。
US
(米国)。 -
BUCKET_NAME
は、 一意のバケット名に置き換えます。バケットの名前空間は世界中の誰でも閲覧可能です。機密情報をバケット名に含めないようにしてください。 - 次のものをコピーします。これらは以後のセクションで使用されます。
- Cloud Storage バケット名。
- 実際の Google Cloud のプロジェクト ID。ID を調べる方法については、プロジェクトの識別をご覧ください。
- Java Development Kit(JDK)バージョン 11 をダウンロードしてインストールします(Dataflow は引き続きバージョン 8 をサポートします)。
JAVA_HOME
環境変数が設定され、ご使用の JDK インストールをポイントしていることを確認します。 - ご使用のオペレーティング システムに対応した Maven のインストール ガイドに沿って、Apache Maven をダウンロードし、インストールします。
gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
-
Set the storage class to
パイプライン コードの取得
Apache Beam SDK は、データ処理パイプライン用のオープンソースのプログラミング モデルです。Apache Beam プログラムでこのようなパイプラインを定義し、パイプラインを実行する Dataflow などのランナーを選択できます。
- シェルまたはターミナルで、Maven Archetype Plugin を使用して、Apache Beam SDK の
WordCount
の例を含む Maven プロジェクトをコンピュータ上に作成します。mvn archetype:generate \ -DarchetypeGroupId=org.apache.beam \ -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \ -DarchetypeVersion=2.65.0 \ -DgroupId=org.example \ -DartifactId=word-count-beam \ -Dversion="0.1" \ -Dpackage=org.apache.beam.examples \ -DinteractiveMode=false
このコマンドにより、現在のディレクトリの下に
word-count-beam
という新しいディレクトリが作成されます。word-count-beam
ディレクトリには、シンプルなpom.xml
ファイルと、テキスト ファイル内の単語数をカウントする一連のサンプル パイプラインが含まれています。 word-count-beam
ディレクトリにpom.xml
ファイルが含まれていることを確認します。Linux または macOS
cd word-count-beam/ ls
次のような出力が表示されます。
pom.xml src
Windows
cd word-count-beam/ dir
次のような出力が表示されます。
pom.xml src
- Maven プロジェクトにサンプル パイプラインが含まれていることを確認します。
Linux または macOS
ls src/main/java/org/apache/beam/examples/
次のような出力が表示されます。
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
Windows
dir src/main/java/org/apache/beam/examples/
次のような出力が表示されます。
DebuggingWordCount.java WindowedWordCount.java common MinimalWordCount.java WordCount.java
これらの例で使用されている Apache Beam のコンセプトの詳細については、Apache Beam WordCount の例をご覧ください。次のセクションでは、
WordCount.java
を使用します。
パイプラインをローカルで実行する
- シェルまたはターミナルで、
word-count-beam
ディレクトリからWordCount
パイプラインをローカルで実行します。mvn compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--output=counts"
出力ファイルには接頭辞
counts
があり、word-count-beam
ディレクトリに書き込まれます。出力ファイルには、入力テキストの一意の単語と、各単語の出現回数が含まれます。
Dataflow サービスでパイプラインを実行する
- シェルまたはターミナルで、
word-count-beam
ディレクトリから Dataflow サービスに対してWordCount
パイプラインをビルドして実行します。mvn -Pdataflow-runner compile exec:java \ -Dexec.mainClass=org.apache.beam.examples.WordCount \ -Dexec.args="--project=PROJECT_ID \ --gcpTempLocation=gs://BUCKET_NAME/temp/ \ --output=gs://BUCKET_NAME/output \ --runner=DataflowRunner \ --region=REGION"
次のように置き換えます。
PROJECT_ID
: 実際の Google Cloud プロジェクト IDBUCKET_NAME
: Cloud Storage バケットの名前REGION
: Dataflow リージョン(us-central1
など)
結果を表示する
Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
[ジョブ] に移動[ジョブ] ページには、ステータスなどの使用可能なジョブの詳細がすべて表示されます。wordcount ジョブの [ステータス] は最初は [Running] で、その後 [Succeeded] に更新されます。
Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。
[バケット] に移動[バケット] ページには、プロジェクト内のすべてのストレージ バケットが一覧表示されます。
- 作成したストレージ バケットをクリックします。
[バケットの詳細] ページに、Dataflow ジョブで作成された出力ファイルとステージング ファイルが表示されます。
クリーンアップ
このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、リソースを含む Google Cloud プロジェクトを削除します。
プロジェクトを削除する
課金をなくす最も簡単な方法は、クイックスタートで作成した Google Cloud プロジェクトを削除することです。
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
個々のリソースの削除
このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。
- In the Google Cloud console, go to the Cloud Storage Buckets page.
- Click the checkbox for the bucket that you want to delete.
- To delete the bucket, click Delete, and then follow the instructions.
Compute Engine のデフォルト サービス アカウントに付与したロールを取り消します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.objectAdmin
gcloud projects remove-iam-policy-binding PROJECT_ID \ --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \ --role=SERVICE_ACCOUNT_ROLE
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke