Dataflow と Cloud Storage を使用して Pub/Sub からメッセージをストリーミングする
Dataflow は、ストリーミング(リアルタイム)モードのデータとバッチモードのデータを同等の信頼性と明瞭度で変換、活用するフルマネージド サービスです。Apache Beam SDK を使用して、簡素化されたパイプライン開発環境を提供します。Apache Beam SDK は、ウィンドウ処理とセッション分析のプリミティブが豊富に用意されているだけでなく、ソースとシンクのコネクタからなるエコシステムも提供しています。このクイックスタートでは、Dataflow を使用して次の操作を行う方法を説明します。
- Pub/Sub トピックにパブリッシュされたメッセージを読む
- タイムスタンプごとにメッセージをウィンドウ処理(グループ化)する
- Cloud Storage にメッセージを書き込む
このクイックスタートでは、Java と Python で Dataflow を使用する方法について説明します。SQL もサポートされます。このクイックスタートは、ご利用を開始していただくにあたり一時的な認証情報を提供する Google Cloud Skills Boost チュートリアルとしても掲載しています。
カスタムデータ処理を行う予定がない場合は、UI ベースの Dataflow テンプレートを使用することもできます。
始める前に
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
Pub/Sub プロジェクトを設定する
-
バケット、プロジェクト、およびリージョンの変数を作成します。 Cloud Storage のバケット名は、グローバルに一意である必要がある。 このクイックスタートでコマンドを実行する場所に近接した Dataflow リージョンを選択します。
REGION
変数の値は有効なリージョン名にする必要があります。リージョンとロケーションの詳細については、Dataflow のロケーションをご覧ください。BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
このプロジェクトが所有する Cloud Storage バケットを作成します。
gcloud storage buckets create gs://$BUCKET_NAME
-
このプロジェクトで Pub/Sub トピックを作成します。
gcloud pubsub topics create $TOPIC_ID
-
このプロジェクトで Cloud Scheduler ジョブを作成します。このジョブは、1 分間隔で Cloud Pub/Sub トピックにメッセージをパブリッシュします。
App Engine アプリがプロジェクトに存在しない場合は、この手順で作成されます。
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
ジョブを開始します。
gcloud scheduler jobs run publisher-job --location=$REGION
-
次のコマンドを使用して、クイックスタート リポジトリのクローンを作成し、サンプルコード ディレクトリに移動します。
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Pub/Sub から Cloud Storage へのメッセージのストリーミング
コードサンプル
このサンプルコードでは、Dataflow を使用して次のことを行います。
- Pub/Sub メッセージを読み取ります。
- パブリッシュ タイムスタンプにより、固定サイズの間隔でメッセージをウィンドウ処理(グループ化)します。
各ウィンドウのメッセージを Cloud Storage のファイルに書き込みます。
Java
Python
パイプラインの開始
パイプラインを開始するには、次のコマンドを実行します。
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
上記のコマンドがローカルで実行され、クラウドで実行される Dataflow ジョブを起動します。コマンドが JOB_MESSAGE_DETAILED: Workers
have started successfully
を返したら、Ctrl+C
を使用してローカル プログラムを終了します。
ジョブとパイプラインの進行状況の確認
ジョブの進行状況は Dataflow コンソールで確認できます。
[ジョブの詳細] ビューを開いて、次の情報を確認します。
- ジョブの構成
- ジョブのログ
- ステージ指標
Cloud Storage に出力ファイルが表示されるまで数分間かかる場合があります。
または、以下のコマンドラインを使用して、書き出されたファイルを確認します。
gcloud storage ls gs://${BUCKET_NAME}/samples/
出力は次のようになります。
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
クリーンアップ
このページで使用したリソースについて、 Google Cloud アカウントに課金されないようにするには、リソースを含む Google Cloud プロジェクトを削除します。
Cloud Scheduler ジョブを削除します。
gcloud scheduler jobs delete publisher-job --location=$REGION
Dataflow コンソールで、ジョブを停止します。パイプラインをドレインせずにキャンセルします。
トピックを削除します。
gcloud pubsub topics delete $TOPIC_ID
パイプラインによって作成されたファイルを削除します。
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Cloud Storage バケットを削除します。
gcloud storage rm gs://${BUCKET_NAME} --recursive
- サービス アカウントを削除します。
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
次のステップ
カスタム タイムスタンプで Pub/Sub メッセージをウィンドウ処理する場合は、タイムスタンプを Pub/Sub メッセージ内の属性として指定し、そのカスタム タイムスタンプを PubsubIO の
withTimestampAttribute
で使用できます。Google のストリーミング用に設計されたオープンソースの Dataflow テンプレートをご覧ください。
Dataflow と Pub/Sub の統合方法の詳細を確認します。
Pub/Sub から読み取り、Dataflow Flex テンプレートを使用して BigQuery に書き込むこちらのチュートリアルをご覧ください。
ウィンドウ処理の詳細については、Apache Beam モバイルゲーム パイプラインの例をご覧ください。