このチュートリアルでは、Pub/Sub Subscription to BigQuery テンプレートを使用し、 Google Cloud コンソールまたは Google Cloud CLI で Dataflow テンプレート ジョブを作成して実行します。このチュートリアルでは、Pub/Sub から JSON エンコードのメッセージを読み取り、BigQuery テーブルに書き込むストリーミング パイプラインの例を確認します。
ストリーミング分析とデータ統合パイプラインでは、Pub/Sub を使用してデータの取り込みと配信を行います。Pub/Sub を使用すると、イベント プロデューサー(パブリッシャー)とコンシューマー(サブスクライバー)のシステムを作成できます。パブリッシャーはイベントを Pub/Sub サービスに非同期に送信します。Pub/Sub は、イベントに反応する必要のあるすべてのサービスにイベントを配信します。
Dataflow は、ストリーミング(リアルタイム)モードとバッチモードの両方でデータの変換と拡充を行うためのフルマネージド型のサービスです。Apache Beam SDK を使用して受信データを変換し、変換したデータを出力する簡素化されたパイプライン開発環境を提供します。
このワークフローの利点は、BigQuery に書き込まれる前にメッセージ データを UDF を使用して変換できることです。
このシナリオで Dataflow パイプラインを実行する前に、UDF を使用した Pub/Sub BigQuery サブスクリプションが要件を満たしているかどうかを検討してください。
目標
- Pub/Sub トピックを作成する。
- テーブルとスキーマを使用して BigQuery データセットを作成する。
- Dataflow と Google 提供のストリーミング テンプレートを使用して、Pub/Sub サブスクリプションから BigQuery にデータをストリーミングする。
費用
このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。
- Dataflow
- Pub/Sub
- Cloud Storage
- BigQuery
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。
このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。
始める前に
このセクションでは、プロジェクトを選択して API を有効にし、ユーザー アカウントとワーカー サービス アカウントに適切なロールを付与する方法について説明します。
コンソール
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
このチュートリアルの手順を完了するには、ユーザー アカウントにサービス アカウント ユーザーのロールが必要です。Compute Engine のデフォルトのサービス アカウントには、Dataflow ワーカー、Dataflow 管理者、Pub/Sub 編集者、Storage オブジェクト管理者、BigQuery データ編集者のロールが付与されている必要があります。 Google Cloud コンソールで、必要なロールを追加するには:
Google Cloud コンソールで、[IAM] ページに移動します。
[IAM] に移動- プロジェクトを選択します。
- ユーザー アカウントを含む行で、「プリンシパルを編集します」アイコン をクリックし、[ 別のロールを追加] をクリックします。
- プルダウン リストで、[サービス アカウント ユーザー] のロールを選択します。
- Compute Engine のデフォルトのサービス アカウントを含む行で、「プリンシパルを編集します」アイコン をクリックし、[ 別のロールを追加] をクリックします。
- プルダウン リストで、[Dataflow ワーカー] ロールを選択します。
この手順を Dataflow 管理者、Pub/Sub 編集者、Storage オブジェクト管理者、BigQuery データ編集者ロールに対して繰り返し、[保存] をクリックします。
ロール付与の詳細については、コンソールを使用して IAM ロールを付与するをご覧ください。
gcloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Install the Google Cloud CLI.
-
If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。PROJECT_NUMBER
: プロジェクトの番号。プロジェクト番号を確認するには、gcloud projects describe
コマンドを使用します。SERVICE_ACCOUNT_ROLE
: 個々のロール。
Cloud Storage バケットを作成する
まず、 Google Cloud コンソールまたは Google Cloud CLI を使用して Cloud Storage バケットを作成します。Dataflow パイプラインは、このバケットを一時ストレージの場所として使用します。
コンソール
Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。
[作成] をクリックします。
[バケットの作成] ページの [バケットに名前を付ける] で、バケット名の要件を満たす名前を入力します。Cloud Storage のバケット名は、グローバルに一意にする必要があります。他のオプションは選択しないでください。
[作成] をクリックします。
gcloud
gcloud storage buckets create
コマンドを実行します。
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME
は、バケットの命名要件を満たす Cloud Storage バケットの名前に置き換えます。Cloud Storage のバケット名は、グローバルに一意である必要がある。
Pub/Sub トピックとサブスクリプションを作成する
Pub/Sub トピックを作成し、そのトピックに対するサブスクリプションを作成します。
Console
トピックを作成するには、次の手順で操作します。
Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。
[トピックを作成] をクリックします。
[トピック ID] フィールドに、トピックの ID を入力します。トピックの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。
[デフォルトのサブスクリプションを追加する] オプションは、そのまま保持します。他のオプションは選択しないでください。
[作成] をクリックします。
- トピックの詳細ページの [サブスクリプション ID] に、作成されたサブスクリプションの名前が表示されます。この値をメモしておきます。
gcloud
トピックを作成するには、gcloud pubsub topics create
コマンドを実行します。サブスクリプションの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。
gcloud pubsub topics create TOPIC_ID
TOPIC_ID
は、Pub/Sub トピックの名前に置き換えます。
トピックのサブスクリプションを作成するには、gcloud pubsub subscriptions create
コマンドを実行します。
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
SUBSCRIPTION_ID
は、Pub/Sub サブスクリプションの名前に置き換えます。
BigQuery テーブルを作成する
この手順では、次のスキーマを持つ BigQuery テーブルを作成します。
列名 | データ型 |
---|---|
name |
STRING |
customer_id |
INTEGER |
BigQuery データセットがまだない場合は、まず作成します。詳細については、データセットの作成をご覧ください。次に、新しい空のテーブルを作成します。
コンソール
[BigQuery] ページに移動します。
[エクスプローラ] ペインでプロジェクトを開き、データセットを選択します。
[データセット情報] セクションで、[
テーブルを作成] をクリックします。[テーブルの作成元] リストで [空のテーブル] を選択します。
[テーブル] ボックスにテーブルの名前を入力します。
[スキーマ] セクションで [テキストとして編集] をクリックします。
次のスキーマ定義を貼り付けます。
name:STRING, customer_id:INTEGER
[テーブルを作成] をクリックします。
gcloud
bq mk
コマンドを使用します。
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
次のように置き換えます。
PROJECT_ID
: プロジェクト IDDATASET_NAME
: データセットの名前。TABLE_NAME
: 作成するテーブルの名前。
パイプラインを実行する
Google 提供の Pub/Sub Subscription to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。このパイプラインは Pub/Sub トピックから受信データを取得し、BigQuery データセットに出力します。
コンソール
Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
[テンプレートからジョブを作成] をクリックします。
Dataflow ジョブの [ジョブ名] を入力します。
[リージョン エンドポイント] で、Dataflow ジョブのリージョンを選択します。
[Dataflow テンプレート] で、Pub/Sub Subscription to BigQuery テンプレートを選択します。
[BigQuery 出力テーブル] で [参照] を選択し、BigQuery テーブルを選択します。
Pub/Sub の入力サブスクリプションのリストで、Pub/Sub サブスクリプションを選択します。
[一時的な場所] に、次のように入力します。
gs://BUCKET_NAME/temp/
BUCKET_NAME
は Cloud Storage バケットの名前で置き換えます。temp
フォルダには、Dataflow ジョブの一時ファイルが保存されます。[ジョブを実行] をクリックします。
gcloud
シェルまたはターミナルでテンプレートを実行するには、gcloud dataflow jobs run
コマンドを使用します。
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME
次の変数を置き換えます。
JOB_NAME
: ジョブの名前DATAFLOW_REGION
: ジョブのリージョンPROJECT_ID
: Google Cloud プロジェクトの名前SUBSCRIPTION_ID
: Pub/Sub サブスクリプションの名前DATASET_NAME
: BigQuery データセットの名前TABLE_NAME
: BigQuery テーブルの名前
Pub/Sub にメッセージをパブリッシュする
Dataflow ジョブの開始後に Pub/Sub にメッセージをパブリッシュすると、パイプラインがメッセージを BigQuery に書き込みます。
コンソール
Google Cloud コンソールで、[Pub/Sub] > [トピック] ページに移動します。
トピックのリストで、トピックの名前をクリックします。
[メッセージ] をクリックします。
[メッセージをパブリッシュ] をクリックします。
[メッセージの数] に「
10
」と入力します。[メッセージ本文] に「
{"name": "Alice", "customer_id": 1}
」と入力します。[公開] をクリックします。
gcloud
メッセージをトピックにパブリッシュするには、gcloud pubsub topics publish
コマンドを使用します。
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
TOPIC_ID
は、実際のプロジェクト名で置き換えます。
結果を表示する
BigQuery テーブルに書き込まれたデータを表示します。テーブルにデータが表示されるまで、最大で 1 分かかることがあります。
コンソール
Google Cloud コンソールで、[BigQuery] ページに移動します。
[BigQuery] ページに移動[クエリエディタ] ペインで、次のクエリを実行します。
SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000
次の変数を置き換えます。
PROJECT_ID
: Google Cloudプロジェクトの名前DATASET_NAME
: BigQuery データセットの名前TABLE_NAME
: BigQuery テーブルの名前
gcloud
次のクエリを実行して、BigQuery の結果を確認します。
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'
次の変数を置き換えます。
PROJECT_ID
: Google Cloudプロジェクトの名前DATASET_NAME
: BigQuery データセットの名前TABLE_NAME
: BigQuery テーブルの名前
UDF を使用してデータを変換する
このチュートリアルでは、Pub/Sub メッセージが JSON 形式で、BigQuery テーブル スキーマが JSON データと一致していることを前提としています。
BigQuery に書き込まれる前にデータを変換する JavaScript ユーザー定義関数(UDF)を指定することもできます。UDF は、フィルタリング、個人を特定できる情報(PII)の削除、追加のフィールドによるデータの拡充などの処理も実行できます。
詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。
デッドレター テーブルを使用する
ジョブの実行中に、パイプラインが個々のメッセージを BigQuery に書き込めないことがあります。考えられるエラーは次のとおりです。
- シリアル化エラー(JSON 形式が正しくない場合など)。
- テーブル スキーマと JSON データの不一致が原因で発生する型変換エラー。
- JSON データにあり、テーブル スキーマに存在しない余分なフィールド。
パイプラインは、これらのエラーを BigQuery のデッドレター テーブルに書き込みます。デフォルトでは、パイプラインは TABLE_NAME_error_records
という名前のデッドレター テーブルを自動的に作成します。ここで、TABLE_NAME
は出力テーブルの名前です。別の名前を使用するには、outputDeadletterTable
テンプレート パラメータを設定します。
クリーンアップする
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。
プロジェクトを削除する
課金をなくす最も簡単な方法は、チュートリアル用に作成した Google Cloud プロジェクトを削除することです。
コンソール
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
gcloud
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
個々のリソースを削除する
後でプロジェクトを再利用する場合は、プロジェクトを残したまま、チュートリアル中に作成したリソースを削除します。
Dataflow パイプラインを停止する
コンソール
Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
停止するジョブをクリックします。
ジョブを停止するには、ジョブのステータスが「実行中」でなければなりません。
ジョブの詳細ページで、[停止] をクリックします。
[キャンセル] をクリックします。
選択を確定するには、[ジョブの停止] をクリックします。
gcloud
Dataflow ジョブをキャンセルするには、gcloud dataflow jobs
コマンドを使用します。
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
Google Cloud プロジェクト リソースをクリーンアップする
コンソール
Pub/Sub トピックとサブスクリプションを削除します。
Google Cloud コンソールで Pub/Sub の [トピック] ページに移動します。
作成したトピックを選択します。
[削除] をクリックして、トピックを永続的に削除します。
Google Cloud コンソールで Pub/Sub サブスクリプション ページに移動します。
トピックで作成したサブスクリプションを選択します。
[削除] をクリックして、サブスクリプションを永続的に削除します。
BigQuery テーブルとデータセットを削除します。
Google Cloud コンソールで、[BigQuery] ページに移動します。
[エクスプローラ] パネルで、プロジェクトを展開します。
削除するデータセットの横にある「アクションを表示」アイコン
をクリックし、[削除] をクリックします。
Cloud Storage バケットを削除します。
Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。
削除するバケットを選択し、[
削除] をクリックして、指示に沿って操作します。
gcloud
Pub/Sub サブスクリプションとトピックを削除するには、
gcloud pubsub subscriptions delete
コマンドとgcloud pubsub topics delete
コマンドを使用します。gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
BigQuery テーブルを削除するには、
bq rm
コマンドを使用します。bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
BigQuery データセットを削除します。データセットだけで料金は発生しません。
bq rm -r -f -d PROJECT_ID:tutorial_dataset
Cloud Storage バケットとそのオブジェクトを削除するには、
gcloud storage rm
コマンドを使用します。バケットだけでは料金は発生しません。gcloud storage rm gs://BUCKET_NAME --recursive
認証情報を取り消す
コンソール
プロジェクトを残しておく場合は、Compute Engine のデフォルトのサービス アカウントに付与したロールを取り消します。
- Google Cloud コンソールで、[IAM] ページに移動します。
プロジェクト、フォルダ、または組織を選択します。
アクセス権を取り消すプリンシパルを含む行を探します。その行の「プリンシパルを編集します」アイコン
をクリックします。取り消すロールごとに [削除
] ボタンをクリックして、[保存] をクリックします。
gcloud
- プロジェクトを残しておく場合は、Compute Engine のデフォルトのサービス アカウントに付与したロールを取り消します。以下の IAM ロールごとに次のコマンドを 1 回実行します。
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var>
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
次のステップ
- UDF によって Dataflow テンプレートを拡張する。
- Dataflow テンプレートの使用方法を学習する。
- Google 提供のすべてのテンプレートを表示する。
- Pub/Sub を使用してトピックを作成して使用する方法と、pull サブスクリプションを作成する方法を確認する。
- BigQuery を使用してデータセットを作成する方法を確認する。
- Pub/Sub サブスクリプションについて学習する。
- Google Cloud に関するリファレンス アーキテクチャ、図、ベスト プラクティスを確認する。Cloud アーキテクチャ センターをご覧ください。