1 対多の Pub/Sub システムを構築する

このチュートリアルでは、同期 RPC ではなく、Pub/Sub を介してメッセージを送信することによって通信する、一連のアプリケーションを設定する手順について説明します。アプリケーションを分離することにより、メッセージングには以下のようなメリットがあります。

  • アプリケーションの堅牢性が強化されます。
  • 開発をシンプルにできる可能性があります。

たとえば、呼び出し元(パブリッシャー)が受信者(サブスクライバー)を起動して稼働状態にする必要がなくなります。パブリッシャーは Pub/Sub にメッセージを送信します。 パブリッシャーは、メッセージを受信する必要があるサブスクライバー アプリケーションの種類と数を認識する必要はありません。そのため 1 つ以上のサブスクライバー アプリケーションが稼働しているときは、サービスは常にサブスクライバー アプリケーションにメッセージを配信できます。

システムの概要

このチュートリアルでは、次の図に示すように、1 対多の通信を使用して 2 つのサブスクライバーに「Hello, World!」メッセージを送信するパブリッシャー アプリケーションを起動します。

トピック、トピックに関連付けられたサブスクリプション、Cloud Pub/Sub との間でメッセージを送受信するパブリッシャー アプリケーションとサブスクライバー アプリケーションの図

2 つのサブスクライバー アプリケーションには同一のコードを使用しますが、起動するタイミングは異なります。このプロセスでは、Pub/Sub で非同期通信を有効にできます。このシステムを構築する手順は次のとおりです。

  1. アプリケーションが認証に使用する IAM サービス アカウントを作成します。
  2. IAM 権限を設定します。
  3. Pub/Sub トピックとサブスクリプションを作成する
  4. 1 つのパブリッシャーと 2 つのサブスクライバーの合計 3 つの独立したアプリケーションを起動させます。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Pub/Sub API:

    gcloud services enable pubsub.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.publisher, roles/pubsub.subscriber

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Pub/Sub API:

    gcloud services enable pubsub.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.publisher, roles/pubsub.subscriber

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

Python をインストールする

このチュートリアルでは、Pub/Sub クライアント ライブラリを使用します。これには Python 3.7 以降が必要です。Python のインストールの手順を完了します。

Pub/Sub プロジェクトを設定する

パブリッシュ アプリケーションとサブスクリプション アプリケーション間のメッセージ フローを管理するには、トピックと 2 つの異なるサブスクリプションを作成します。

Pub/Sub トピックの作成

ID hello_topic を含むトピックを作成します。

gcloud pubsub topics create hello_topic

Pub/Sub サブスクリプションの作成

2 つのサブスクリプションを作成してトピックに関連付けます。

これらのサブスクリプションは、pull サブスクリプションの一種である StreamingPull サブスクリプションです。

サブスクリプション 1

ID sub_one を含むサブスクリプションを作成し、hello_topic に添付します。

gcloud pubsub subscriptions create sub_one --topic=hello_topic

サブスクリプション 2

ID sub_two を含むサブスクリプションを作成し、hello_topic に添付します。

gcloud pubsub subscriptions create sub_two --topic=hello_topic

1 対多のシステムを構築する

パブリッシャーとサブスクライバーのコードをダウンロードする

  1. このチュートリアルで必要な Pub/Sub Python ファイルをダウンロードします。

     git clone https://github.com/googleapis/python-pubsub.git
  2. 次の手順に進む前に、開いているターミナルをすべて閉じます。

3 つのターミナルを設定する

  1. 各チュートリアル アプリケーション(1 つのパブリッシャーと 2 つのサブスクライバー)につき 1 つのターミナルを起動します。便宜上、このチュートリアルでは次のターミナルを呼び出します。

    • publisher ターミナル
    • sub_one ターミナル
    • sub_two ターミナル
  2. publisher ターミナルで、pyenv-qs という名前の Python 仮想環境を作成して有効にします。

    Bash

    python -m venv pyenv-qs
    source pyenv-qs/bin/activate

    PowerShell

    py -m venv pyenv-qs
    .\pyenv-qs\Scripts\activate

    sub_one ターミナルと sub_one ターミナルで、次のコマンドを実行します。

    Bash

    source pyenv-qs/bin/activate

    PowerShell

    .\pyenv-qs\Scripts\activate

    activate コマンドを実行すると、コマンド プロンプトに (pyenv-qs) $ という値が表示されます。

  3. publisher ターミナルで pip を使用して Pub/Sub Python クライアント ライブラリをインストールします。

    python -m pip install --upgrade google-cloud-pubsub
  4. 3 つのターミナルすべてで、現在のプロジェクト ID で環境変数を設定します。この gcloud コマンドは、選択したプロジェクト ID を判別し変数として設定します。

    Bash

    export PROJECT=`gcloud config get-value project`

    PowerShell

    $env:PROJECT=$(gcloud config get-value project)
  5. 3 つすべてのターミナルで、サンプルコードを含むプロジェクトパスに変更します。

    cd python-pubsub/samples/snippets/quickstart/

アプリケーションを起動してメッセージ フローを確認する

サブスクライバー 1 のアプリケーションを起動する

sub_one ターミナルでサブスクライバー 1 を起動します。

Bash

python sub.py $PROJECT sub_one

PowerShell

py sub.py $env:PROJECT sub_one

起動すると、このアプリケーションはサーバーとの双方向ストリーミング接続を確立します。Pub/Sub は、その接続を介してメッセージを配信します。

サブスクライバー 1 アプリケーションは sub_one サブスクリプションでメッセージのリッスンを開始します。

パブリッシャー アプリケーションを起動する

publisher ターミナルでパブリッシャー アプリケーションを起動します。

Bash

python pub.py $PROJECT hello_topic

PowerShell

py pub.py $env:PROJECT hello_topic

パブリッシャー アプリケーションが起動すると、Pub/Sub システムが次の処理を行います。

  • パブリッシャー アプリケーションが「Hello, World!」というメッセージを Pub/Sub に送信します。既存のサブスクリプションにはこれは認識されません。このサーバーはメッセージ ID の割り当ても行います。

  • サブスクライバー 1 アプリケーションが「Hello World」というメッセージを受信して出力し、確認応答を Pub/Sub に送信します。

  • パブリッシャー アプリケーションが確認応答を出力します。この確認応答により、メッセージの処理に成功したことと、このサブスクライバーや他の sub_one サブスクライバーに再送信する必要がないことが Pub/Sub に通知されます。

Pub/Sub によりメッセージが sub_one から削除されます。

パブリッシャー アプリケーションはメッセージをパブリッシュし、メッセージ ID を割り当てます。サブスクライバー 1 アプリケーションは「Hello World」メッセージを受信し、確認応答を送信します。

サブスクライバー 2 アプリケーションを起動する

sub_two ターミナルでサブスクライバー 2 を起動します。

Bash

python sub.py $PROJECT sub_two

PowerShell

py sub.py $env:PROJECT sub_two

このサブスクライバーは、sub_two サブスクリプションに配信されたメッセージを受信します。サブスクライバー 2 では sub.py スクリプトが再利用されます。ただし、サブスクライバー 2 はパブリッシャーがトピックとサブスクリプションにメッセージを送信するまで起動されません。パブリッシャーサブスクライバー 2 を直接呼び出した場合、パブリッシャー アプリケーションはサブスクライバー 2 が起動するまで待機するかタイムアウトする必要があります。Pub/Sub は、サブスクライバー 2 へのメッセージを効率的に保存して管理します。

サブスクライバー 2 がリッスンを開始し、リッスンのため待機していたメッセージを sub_two で受信します。

これで、Pub/Sub を使用して開発を行う準備が整いました。

いかがでしたか

Cloud Pub/Sub のサポートページにその他の参考資料やリンクがありますので、こちらもご利用ください。

クリーンアップ

  1. 実行中のすべてのアプリケーションを停止します。
  2. ローカル環境からサンプルコードのディレクトリを削除します。
  3. トピックを削除します。

    gcloud pubsub topics delete hello_topic
  4. サブスクリプションを削除します。

    gcloud pubsub subscriptions delete sub_one
    gcloud pubsub subscriptions delete sub_two
  5. Google Cloud コンソールの [IAM と管理] セクションでチュートリアル プロジェクトを終了します。

  6. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  7. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

次のステップ

以下のことができます。

  • GitHub でチュートリアルの pub.py コードと sub.py コードを確認し、Pub/Sub の他のサンプルを閲覧できます。演習として、1 秒間隔で現地時間をパブリッシュするバージョンの pub.py を作成します。

  • メッセージのバッチ処理の方法を学びます。

  • push サブスクリプションを使用して、App Engine エンドポイントCloud Functions をトリガーするメッセージを受信します。

  • 再生を使用して、確認応答済みのメッセージを取得します。デフォルトでは、Pub/Sub はサブスクリプションから確認応答済みのメッセージを削除します。たとえば、このチュートリアルでは sub.py を再実行して、「Hello, World!」というメッセージを再度受信することはできません。再生機能を使用すると、メッセージが確認応答された後、そのメッセージを受信するようにサブスクリプションを設定できます。

  • 他の言語のクライアント ライブラリを使用します。