Pub/Sub にデータをエクスポートする(リバース ETL)

Pub/Sub にデータをエクスポートするには、BigQuery の継続的クエリを使用する必要があります。継続的クエリのプレビューに登録するには、リクエスト フォームにご記入ください。この機能に関するフィードバックやサポートのリクエストを行う場合は、bq-continuous-queries-feedback@google.com 宛てにメールをお送りください。

このドキュメントでは、BigQuery から Pub/Sub へのリバース ETL(RETL)を設定する方法について説明します。これを行うには、継続的クエリEXPORT DATA ステートメントを使用して、BigQuery から Pub/Sub トピックにデータをエクスポートします。

Pub/Sub への RETL ワークフローを使用すると、BigQuery の分析機能と Pub/Sub の非同期でスケーラブルなグローバル メッセージング サービスを組み合わせることができます。このワークフローにより、ダウンストリームのアプリケーションとサービスにイベント ドリブンでデータを提供できます。

前提条件

サービス アカウントを作成する必要があります。結果を Pub/Sub トピックにエクスポートする継続的クエリを実行するには、サービス アカウントが必要です。

継続的クエリの結果をメッセージとして受信するには、Pub/Sub トピックと、ターゲット アプリケーションがそれらのメッセージを受信するための Pub/Sub サブスクリプションを作成する必要があります。

必要なロール

このセクションでは、継続的クエリを作成するユーザー アカウントと、継続的クエリを実行するサービス アカウントに必要なロールと権限について説明します。

ユーザー アカウント権限

BigQuery でジョブを作成するには、ユーザー アカウントに bigquery.jobs.create IAM 権限が必要です。次の各 IAM ロールには、bigquery.jobs.create 権限が付与されています。

サービス アカウントを使用して実行されるジョブを送信するには、ユーザー アカウントにサービス アカウント ユーザー(roles/iam.serviceAccountUserロールが必要です。同じユーザー アカウントを使用してサービス アカウントを作成する場合、ユーザー アカウントにはサービス アカウント管理者(roles/iam.serviceAccountAdminロールが必要です。プロジェクト内のすべてのサービス アカウントではなく、単一のサービス アカウントへのユーザーのアクセスを制限する方法については、単一のロールを付与するをご覧ください。

ユーザー アカウントが継続的クエリのユースケースに必要な API を有効にしなければならない場合は、ユーザー アカウントに Service Usage 管理者(roles/serviceusage.serviceUsageAdminロールが必要です。

サービス アカウントの権限

BigQuery テーブルからデータをエクスポートするには、サービス アカウントに bigquery.tables.export IAM 権限が必要です。次の IAM ロールにはそれぞれ bigquery.tables.export 権限が付与されています。

サービス アカウントが Pub/Sub にアクセスできるようにするには、サービス アカウントに次の IAM ロールの両方を付与する必要があります。

必要な権限は、カスタムロールを使用して取得することもできます。

始める前に

Enable the BigQuery and Pub/Sub APIs.

Enable the APIs

Pub/Sub へのエクスポート

EXPORT DATA ステートメントを使用して、データを Pub/Sub トピックにエクスポートします。

コンソール

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. クエリエディタで、[展開] > [クエリ設定] の順にクリックします。

  3. [継続的クエリ] セクションで、[継続的クエリモードを使用する] チェックボックスをオンにします。

  4. [サービス アカウント] ボックスで、作成したサービス アカウントを選択します。

  5. [保存] をクリックします。

  6. クエリエディタで次のステートメントを入力します。

    EXPORT DATA
    OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID'
    ) AS
    (
    QUERY
    );

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • TOPIC_ID: Pub/Sub トピック ID。トピック ID は、Google Cloud コンソールの [トピック] ページで取得できます。
    • QUERY: エクスポートするデータを選択する SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
  7. [実行] をクリックします。

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. コマンドラインで、次のフラグを指定して bq query コマンドを使用し、継続的クエリを実行します。

    • クエリを連続的に実行するには、--continuous フラグを true に設定します。
    • --connection_property フラグを使用して、使用するサービス アカウントを指定します。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • SERVICE_ACCOUNT_EMAIL: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、Google Cloud コンソールの [サービス アカウント] ページで確認できます。
    • QUERY: エクスポートするデータを選択する SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。

API

  1. jobs.insert メソッドを呼び出して、継続的クエリを実行します。Job リソースJobConfigurationQuery リソースで、次のフィールドを設定します。

    • クエリを継続的に実行するには、continuous フィールドを true に設定します。
    • connection_property フィールドに、使用するサービス アカウントを指定します。
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs'
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID。
    • QUERY: エクスポートするデータを選択する SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
    • SERVICE_ACCOUNT_EMAIL: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、Google Cloud コンソールの [サービス アカウント] ページで確認できます。

複数の列を Pub/Sub にエクスポートする

出力に複数の列を含める場合は、列値を含む構造体列を作成し、TO_JSON_STRING 関数を使用して構造体値を JSON 文字列に変換します。次の例では、4 つの列からデータをエクスポートし、JSON 文字列としてフォーマットします。

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

エクスポートの最適化

継続的クエリジョブのパフォーマンスが使用可能なコンピューティング リソースによって制限されていると思われる場合は、BigQuery CONTINUOUS スロット予約割り当てのサイズを増やしてみてください。

制限事項

  • エクスポートされたデータは、単一の STRING 列または BYTES 列で構成する必要があります。列名は任意の名前にできます。
  • Pub/Sub にエクスポートするには、継続的クエリを使用する必要があります。
  • 継続的クエリで Pub/Sub トピックにスキーマを渡すことはできません。
  • スキーマを使用する Pub/Sub トピックにデータをエクスポートすることはできません。
  • NULL 値を含むデータはエクスポートできません。継続的クエリに WHERE message IS NOT NULL フィルタを追加すると、クエリ結果から NULL 値を除外できます。
  • エクスポートするデータは Pub/Sub の割り当てを超えないようにする必要があります。

料金

継続的クエリでデータをエクスポートする場合は、BigQuery 容量のコンピューティング料金に基づいて課金されます。継続的クエリを実行するには、Enterprise エディションまたは Enterprise Plus エディションを使用する予約と、CONTINUOUS ジョブタイプを使用する予約割り当てが必要です。

データのエクスポート後、Pub/Sub の使用料金が発生します。詳細については、Pub/Sub の料金をご覧ください。