Dataflow を使用して Pub/Sub Lite メッセージをストリーミングする
独自のデータ処理プログラムを作成して実行する代わりに、Dataflow を Apache Beam の Pub/Sub Lite I/O コネクタとともに使用することもできます。Dataflow は、ストリーミング(リアルタイム)モードのデータとバッチモードのデータを同等の信頼性と明瞭度で変換、活用するフルマネージド サービスです。Dataflow は、Apache Beam SDK を使用して開発されたプログラムを確実に実行します。Apache Beam SDK は、強力なステートフル処理抽象化と、他のストリーミング システムとバッチシステムへの I/O コネクタで構成された拡張可能なセットです。
このクイックスタートでは、以下を行う Apache Beam パイプラインを作成する方法を示します。
- Pub/Sub Lite からメッセージを読み取る
- タイムスタンプごとにメッセージをウィンドウ処理(グループ化)する
- Cloud Storage にメッセージを書き込む
また、次の方法についても説明します。
- Dataflow で実行するため、パイプラインを送信する
- パイプラインから Dataflow フレックス テンプレートを作成する
このチュートリアルでは Maven が必要ですが、サンプル プロジェクトを Maven から Gradle に変換することもできます。詳細については、省略可: Maven から Gradle に変換するをご覧ください。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Pub/Sub Lite プロジェクトを設定する
Cloud Storage バケット、プロジェクト、Dataflow リージョンの変数を作成します。Cloud Storage のバケット名は、グローバルに一意にする必要があります。Dataflow リージョンは、ジョブを実行できる有効なリージョンである必要があります。リージョンとロケーションの詳細については、Dataflow のロケーションをご覧ください。
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
export BUCKET=BUCKET_NAME
export DATAFLOW_REGION=DATAFLOW_REGION
このプロジェクトが所有する Cloud Storage バケットを作成します。
gcloud storage buckets create gs://$BUCKET
Pub/Sub Lite ゾーン Lite のトピックとサブスクリプションを作成する
ゾーン Lite Pub/Sub Lite トピックと Lite サブスクリプションを作成します。
Lite ロケーションの場合は、サポートされている Pub/Sub Lite のロケーションを選択します。また、リージョンのゾーンも指定する必要があります。例: us-central1-a
。
export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Dataflow にメッセージをストリーミングする
クイックスタート サンプルコードをダウンロードする
クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
サンプルコード
このサンプルコードでは、Dataflow を使用して次のことを行います。
- 制限なしソースとして Pub/Sub Lite サブスクリプションからメッセージを読み取る。
- 固定時間ウィンドウとデフォルト トリガーを使用して、パブリッシュ タイムスタンプに基づいてメッセージをグループ化します。
グループ化したメッセージを Cloud Storage のファイルに書き込む。
Java
このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。
Dataflow パイプラインを開始する
Dataflow でパイプラインを開始するには、次のコマンドを実行します。
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
--serviceAccount=$SERVICE_ACCOUNT"
上記のコマンドは Dataflow ジョブを起動します。コンソール出力のリンクをたどって、Dataflow モニタリング コンソールでジョブにアクセスします。
ジョブの進行状況を確認する
Dataflow コンソールでジョブの進行状況を確認します。
[ジョブの詳細] ビューを開いて、次の情報を確認します。
- ジョブグラフ
- 実行の詳細
- ジョブの指標
Lite トピックにメッセージを公開します。
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
ワーカーログにメッセージが表示されるまで、数分間かかる場合があります。
次のコマンドを使用して、Cloud Storage に書き込まれたファイルを確認します。
gcloud storage ls "gs://$BUCKET/samples/"
出力は次のようになります。
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
次のコマンドを使用して、ファイルの内容を確認します。
gcloud storage cat "gs://$BUCKET/samples/your-filename"
省略可: Dataflow テンプレートを作成する
必要に応じて、パイプラインに基づいたカスタム Dataflow フレックス テンプレートを作成できます。Dataflow テンプレートを使用すると、完全な Java 開発環境をセットアップしなくても、 Google Cloud コンソールやコマンドラインから異なる入力パラメータを使用してジョブを実行できます。
パイプラインのすべての依存関係を含むファット JAR を作成します。コマンドの実行後、
target/pubsublite-streaming-bundled-1.0.jar
が表示されます。mvn clean package -DskipTests=true
テンプレート ファイルとテンプレート コンテナ イメージの名前と場所を指定します。
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
カスタム Flex テンプレートを作成します。この例では、ジョブの実行に必要な仕様を含む
metadata.json
ファイルが指定されています。gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
カスタム Flex テンプレートを使用してジョブを実行します。
Console
ジョブ名を入力します。
Dataflow リージョンを入力します。
カスタム テンプレートを選択します。
テンプレート パスを入力します。
必要なパラメータを入力します。
[ジョブを実行] をクリックします。
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION \
--serviceAccount=$SERVICE_ACCOUNT
クリーンアップ
このページで使用したリソースに対して Google Cloud アカウントで課金されないようにするには、 Google Cloud プロジェクトとそのリソースを削除します。
Dataflow コンソールで、ジョブを停止します。パイプラインは、ドレインするのではなくキャンセルします。
トピックとサブスクリプションを削除します。
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
パイプラインによって作成されたファイルを削除します。
gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
テンプレート イメージとテンプレート ファイルが存在する場合は、それらを削除します。
gcloud container images delete $TEMPLATE_IMAGE
gcloud storage rm $TEMPLATE_PATH
Cloud Storage バケットを削除します。
gcloud storage rm gs://$BUCKET --recursive
- サービス アカウントを削除します。
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
次のステップ
Dataflow フレックス テンプレートの構成の詳細を確認する。
Dataflow ストリーミング パイプラインを理解する。