Pub/Sub から BigQuery へのストリーミング


このチュートリアルでは、Pub/Sub Subscription to BigQuery テンプレートを使用し、 Google Cloud コンソールまたは Google Cloud CLI で Dataflow テンプレート ジョブを作成して実行します。このチュートリアルでは、Pub/Sub から JSON エンコードのメッセージを読み取り、BigQuery テーブルに書き込むストリーミング パイプラインの例を確認します。

ストリーミング分析とデータ統合パイプラインでは、Pub/Sub を使用してデータの取り込みと配信を行います。Pub/Sub を使用すると、イベント プロデューサー(パブリッシャー)とコンシューマー(サブスクライバー)のシステムを作成できます。パブリッシャーはイベントを Pub/Sub サービスに非同期に送信します。Pub/Sub は、イベントに反応する必要のあるすべてのサービスにイベントを配信します。

Dataflow は、ストリーミング(リアルタイム)モードとバッチモードの両方でデータの変換と拡充を行うためのフルマネージド型のサービスです。Apache Beam SDK を使用して受信データを変換し、変換したデータを出力する簡素化されたパイプライン開発環境を提供します。

このワークフローの利点は、BigQuery に書き込まれる前にメッセージ データを UDF を使用して変換できることです。

このシナリオで Dataflow パイプラインを実行する前に、UDF を使用した Pub/Sub BigQuery サブスクリプションが要件を満たしているかどうかを検討してください。

目標

  • Pub/Sub トピックを作成する。
  • テーブルとスキーマを使用して BigQuery データセットを作成する。
  • Dataflow と Google 提供のストリーミング テンプレートを使用して、Pub/Sub サブスクリプションから BigQuery にデータをストリーミングする。

費用

このドキュメントでは、課金対象である次の Google Cloudコンポーネントを使用します。

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • BigQuery

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。

新規の Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

このセクションでは、プロジェクトを選択して API を有効にし、ユーザー アカウントとワーカー サービス アカウントに適切なロールを付与する方法について説明します。

コンソール

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  8. このチュートリアルの手順を完了するには、ユーザー アカウントにサービス アカウント ユーザーのロールが必要です。Compute Engine のデフォルトのサービス アカウントには、Dataflow ワーカーDataflow 管理者、Pub/Sub 編集者、Storage オブジェクト管理者、BigQuery データ編集者のロールが付与されている必要があります。 Google Cloud コンソールで、必要なロールを追加するには:

    1. Google Cloud コンソールで、[IAM] ページに移動します。

      [IAM] に移動
    2. プロジェクトを選択します。
    3. ユーザー アカウントを含む行で、「プリンシパルを編集します」アイコン をクリックし、[ 別のロールを追加] をクリックします。
    4. プルダウン リストで、[サービス アカウント ユーザー] のロールを選択します。
    5. Compute Engine のデフォルトのサービス アカウントを含む行で、「プリンシパルを編集します」アイコン をクリックし、[ 別のロールを追加] をクリックします。
    6. プルダウン リストで、[Dataflow ワーカー] ロールを選択します。
    7. この手順を Dataflow 管理者Pub/Sub 編集者Storage オブジェクト管理者BigQuery データ編集者ロールに対して繰り返し、[保存] をクリックします。

      ロール付与の詳細については、コンソールを使用して IAM ロールを付与するをご覧ください。

gcloud

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Compute Engine のデフォルト サービス アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • PROJECT_NUMBER: プロジェクトの番号。プロジェクト番号を確認するには、gcloud projects describe コマンドを使用します。
    • SERVICE_ACCOUNT_ROLE: 個々のロール。

Cloud Storage バケットを作成する

まず、 Google Cloud コンソールまたは Google Cloud CLI を使用して Cloud Storage バケットを作成します。Dataflow パイプラインは、このバケットを一時ストレージの場所として使用します。

コンソール

  1. Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  2. [作成] をクリックします。

  3. [バケットの作成] ページの [バケットに名前を付ける] で、バケット名の要件を満たす名前を入力します。Cloud Storage のバケット名は、グローバルに一意にする必要があります。他のオプションは選択しないでください。

  4. [作成] をクリックします。

gcloud

gcloud storage buckets create コマンドを実行します。

gcloud storage buckets create gs://BUCKET_NAME

BUCKET_NAME は、バケットの命名要件を満たす Cloud Storage バケットの名前に置き換えます。Cloud Storage のバケット名は、グローバルに一意である必要がある。

Pub/Sub トピックとサブスクリプションを作成する

Pub/Sub トピックを作成し、そのトピックに対するサブスクリプションを作成します。

Console

トピックを作成するには、次の手順で操作します。

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. [トピック ID] フィールドに、トピックの ID を入力します。トピックの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。

  4. [デフォルトのサブスクリプションを追加する] オプションは、そのまま保持します。他のオプションは選択しないでください。

  5. [作成] をクリックします。

  6. トピックの詳細ページの [サブスクリプション ID] に、作成されたサブスクリプションの名前が表示されます。この値をメモしておきます。

gcloud

トピックを作成するには、gcloud pubsub topics create コマンドを実行します。サブスクリプションの指定方法については、トピックまたはサブスクリプションの指定方法のガイドラインをご覧ください。

gcloud pubsub topics create TOPIC_ID

TOPIC_ID は、Pub/Sub トピックの名前に置き換えます。

トピックのサブスクリプションを作成するには、gcloud pubsub subscriptions create コマンドを実行します。

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

SUBSCRIPTION_ID は、Pub/Sub サブスクリプションの名前に置き換えます。

BigQuery テーブルを作成する

この手順では、次のスキーマを持つ BigQuery テーブルを作成します。

列名 データ型
name STRING
customer_id INTEGER

BigQuery データセットがまだない場合は、まず作成します。詳細については、データセットの作成をご覧ください。次に、新しい空のテーブルを作成します。

コンソール

  1. [BigQuery] ページに移動します。

    BigQuery に移動

  2. [エクスプローラ] ペインでプロジェクトを開き、データセットを選択します。

  3. [データセット情報] セクションで、[ テーブルを作成] をクリックします。

  4. [テーブルの作成元] リストで [空のテーブル] を選択します。

  5. [テーブル] ボックスにテーブルの名前を入力します。

  6. [スキーマ] セクションで [テキストとして編集] をクリックします。

  7. 次のスキーマ定義を貼り付けます。

    name:STRING,
    customer_id:INTEGER
    
  8. [テーブルを作成] をクリックします。

gcloud

bq mk コマンドを使用します。

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • DATASET_NAME: データセットの名前。
  • TABLE_NAME: 作成するテーブルの名前。

パイプラインを実行する

Google 提供の Pub/Sub Subscription to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。このパイプラインは Pub/Sub トピックから受信データを取得し、BigQuery データセットに出力します。

コンソール

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. [テンプレートからジョブを作成] をクリックします。

  3. Dataflow ジョブの [ジョブ名] を入力します。

  4. [リージョン エンドポイント] で、Dataflow ジョブのリージョンを選択します。

  5. [Dataflow テンプレート] で、Pub/Sub Subscription to BigQuery テンプレートを選択します。

  6. [BigQuery 出力テーブル] で [参照] を選択し、BigQuery テーブルを選択します。

  7. Pub/Sub の入力サブスクリプションのリストで、Pub/Sub サブスクリプションを選択します。

  8. [一時的な場所] に、次のように入力します。

    gs://BUCKET_NAME/temp/
    

    BUCKET_NAME は Cloud Storage バケットの名前で置き換えます。temp フォルダには、Dataflow ジョブの一時ファイルが保存されます。

  9. [ジョブを実行] をクリックします。

gcloud

シェルまたはターミナルでテンプレートを実行するには、gcloud dataflow jobs run コマンドを使用します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME

次の変数を置き換えます。

  • JOB_NAME: ジョブの名前
  • DATAFLOW_REGION: ジョブのリージョン
  • PROJECT_ID: Google Cloud プロジェクトの名前
  • SUBSCRIPTION_ID: Pub/Sub サブスクリプションの名前
  • DATASET_NAME: BigQuery データセットの名前
  • TABLE_NAME: BigQuery テーブルの名前

Pub/Sub にメッセージをパブリッシュする

Dataflow ジョブの開始後に Pub/Sub にメッセージをパブリッシュすると、パイプラインがメッセージを BigQuery に書き込みます。

コンソール

  1. Google Cloud コンソールで、[Pub/Sub] > [トピック] ページに移動します。

    [トピック] に移動

  2. トピックのリストで、トピックの名前をクリックします。

  3. [メッセージ] をクリックします。

  4. [メッセージをパブリッシュ] をクリックします。

  5. [メッセージの数] に「10」と入力します。

  6. [メッセージ本文] に「{"name": "Alice", "customer_id": 1}」と入力します。

  7. [公開] をクリックします。

gcloud

メッセージをトピックにパブリッシュするには、gcloud pubsub topics publish コマンドを使用します。

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

TOPIC_ID は、実際のプロジェクト名で置き換えます。

結果を表示する

BigQuery テーブルに書き込まれたデータを表示します。テーブルにデータが表示されるまで、最大で 1 分かかることがあります。

コンソール

  1. Google Cloud コンソールで、[BigQuery] ページに移動します。
    [BigQuery] ページに移動

  2. [クエリエディタ] ペインで、次のクエリを実行します。

    SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`
    LIMIT 1000
    

    次の変数を置き換えます。

    • PROJECT_ID: Google Cloudプロジェクトの名前
    • DATASET_NAME: BigQuery データセットの名前
    • TABLE_NAME: BigQuery テーブルの名前

gcloud

次のクエリを実行して、BigQuery の結果を確認します。

bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'

次の変数を置き換えます。

  • PROJECT_ID: Google Cloudプロジェクトの名前
  • DATASET_NAME: BigQuery データセットの名前
  • TABLE_NAME: BigQuery テーブルの名前

UDF を使用してデータを変換する

このチュートリアルでは、Pub/Sub メッセージが JSON 形式で、BigQuery テーブル スキーマが JSON データと一致していることを前提としています。

BigQuery に書き込まれる前にデータを変換する JavaScript ユーザー定義関数(UDF)を指定することもできます。UDF は、フィルタリング、個人を特定できる情報(PII)の削除、追加のフィールドによるデータの拡充などの処理も実行できます。

詳細については、Dataflow テンプレートのユーザー定義関数を作成するをご覧ください。

デッドレター テーブルを使用する

ジョブの実行中に、パイプラインが個々のメッセージを BigQuery に書き込めないことがあります。考えられるエラーは次のとおりです。

  • シリアル化エラー(JSON 形式が正しくない場合など)。
  • テーブル スキーマと JSON データの不一致が原因で発生する型変換エラー。
  • JSON データにあり、テーブル スキーマに存在しない余分なフィールド。

パイプラインは、これらのエラーを BigQuery のデッドレター テーブルに書き込みます。デフォルトでは、パイプラインは TABLE_NAME_error_records という名前のデッドレター テーブルを自動的に作成します。ここで、TABLE_NAME は出力テーブルの名前です。別の名前を使用するには、outputDeadletterTable テンプレート パラメータを設定します。

クリーンアップする

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

プロジェクトを削除する

課金をなくす最も簡単な方法は、チュートリアル用に作成した Google Cloud プロジェクトを削除することです。

コンソール

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

gcloud

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

個々のリソースを削除する

後でプロジェクトを再利用する場合は、プロジェクトを残したまま、チュートリアル中に作成したリソースを削除します。

Dataflow パイプラインを停止する

コンソール

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. 停止するジョブをクリックします。

    ジョブを停止するには、ジョブのステータスが「実行中」でなければなりません。

  3. ジョブの詳細ページで、[停止] をクリックします。

  4. [キャンセル] をクリックします。

  5. 選択を確定するには、[ジョブの停止] をクリックします。

gcloud

Dataflow ジョブをキャンセルするには、gcloud dataflow jobs コマンドを使用します。

gcloud dataflow jobs list \
  --filter 'NAME=JOB_NAME AND STATE=Running' \
  --format 'value(JOB_ID)' \
  --region "DATAFLOW_REGION" \
  | xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"

Google Cloud プロジェクト リソースをクリーンアップする

コンソール

  1. Pub/Sub トピックとサブスクリプションを削除します。

    1. Google Cloud コンソールで Pub/Sub の [トピック] ページに移動します。

      [トピック] に移動

    2. 作成したトピックを選択します。

    3. [削除] をクリックして、トピックを永続的に削除します。

    4. Google Cloud コンソールで Pub/Sub サブスクリプション ページに移動します。

      [サブスクリプション] に移動

    5. トピックで作成したサブスクリプションを選択します。

    6. [削除] をクリックして、サブスクリプションを永続的に削除します。

  2. BigQuery テーブルとデータセットを削除します。

    1. Google Cloud コンソールで、[BigQuery] ページに移動します。

      [BigQuery] に移動

    2. [エクスプローラ] パネルで、プロジェクトを展開します。

    3. 削除するデータセットの横にある「アクションを表示」アイコン をクリックし、[削除] をクリックします。

  3. Cloud Storage バケットを削除します。

    1. Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。

      [バケット] に移動

    2. 削除するバケットを選択し、[削除] をクリックして、指示に沿って操作します。

gcloud

  1. Pub/Sub サブスクリプションとトピックを削除するには、gcloud pubsub subscriptions delete コマンドと gcloud pubsub topics delete コマンドを使用します。

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  2. BigQuery テーブルを削除するには、bq rm コマンドを使用します。

    bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
    
  3. BigQuery データセットを削除します。データセットだけで料金は発生しません。

    bq rm -r -f -d PROJECT_ID:tutorial_dataset
    
  4. Cloud Storage バケットとそのオブジェクトを削除するには、gcloud storage rm コマンドを使用します。バケットだけでは料金は発生しません。

    gcloud storage rm gs://BUCKET_NAME --recursive
    

認証情報を取り消す

コンソール

プロジェクトを残しておく場合は、Compute Engine のデフォルトのサービス アカウントに付与したロールを取り消します。

  1. Google Cloud コンソールで、[IAM] ページに移動します。

[IAM] に移動

  1. プロジェクト、フォルダ、または組織を選択します。

  2. アクセス権を取り消すプリンシパルを含む行を探します。その行の「プリンシパルを編集します」アイコン をクリックします。

  3. 取り消すロールごとに [削除] ボタンをクリックして、[保存] をクリックします。

gcloud

  • プロジェクトを残しておく場合は、Compute Engine のデフォルトのサービス アカウントに付与したロールを取り消します。以下の IAM ロールごとに次のコマンドを 1 回実行します。
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
      gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \
      --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \
      --role=<var>ROLE</var>
    

  • Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  • Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

次のステップ