1 対多の Pub/Sub システムを構築する

このチュートリアルでは、同期 RPC ではなく、Pub/Sub を介してメッセージを送信することによって通信する、一連のアプリケーションを設定する手順について説明します。アプリケーションを分離することにより、メッセージングには以下のようなメリットがあります。

  • アプリケーションの堅牢性が強化されます。
  • 開発をシンプルにできる可能性があります。

たとえば、呼び出し元(パブリッシャー)が受信者(サブスクライバー)を起動して稼働状態にする必要がなくなります。パブリッシャーは Pub/Sub にメッセージを送信します。 パブリッシャーは、メッセージを受信する必要があるサブスクライバー アプリケーションの種類と数を認識する必要はありません。そのため 1 つ以上のサブスクライバー アプリケーションが稼働しているときは、サービスは常にサブスクライバー アプリケーションにメッセージを配信できます。

システムの概要

このチュートリアルでは、次の図に示すように、1 対多の通信を使用して 2 つのサブスクライバーに「Hello, World!」メッセージを送信するパブリッシャー アプリケーションを起動します。

トピック、トピックに関連付けられたサブスクリプション、Cloud Pub/Sub との間でメッセージを送受信するパブリッシャー アプリケーションとサブスクライバー アプリケーションの図

2 つのサブスクライバー アプリケーションには同一のコードを使用しますが、起動するタイミングは異なります。このプロセスでは、Pub/Sub で非同期通信を有効にできます。このシステムを構築する手順は次のとおりです。

  1. アプリケーションが認証に使用する IAM サービス アカウントを作成します。
  2. IAM 権限を設定します。
  3. Pub/Sub トピックとサブスクリプションを作成する
  4. 1 つのパブリッシャーと 2 つのサブスクライバーの合計 3 つの独立したアプリケーションを起動させます。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Enable the Pub/Sub API:

    gcloud services enable pubsub.googleapis.com
  7. Create local authentication credentials for your user account:

    gcloud auth application-default login
  8. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.publisher, roles/pubsub.subscriber

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  9. Install the Google Cloud CLI.
  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Make sure that billing is enabled for your Google Cloud project.

  13. Enable the Pub/Sub API:

    gcloud services enable pubsub.googleapis.com
  14. Create local authentication credentials for your user account:

    gcloud auth application-default login
  15. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.publisher, roles/pubsub.subscriber

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.

Python をインストールする

このチュートリアルでは、Pub/Sub クライアント ライブラリを使用します。これには Python 3.7 以降が必要です。Python のインストールの手順を完了します。

Pub/Sub プロジェクトを設定する

パブリッシュ アプリケーションとサブスクリプション アプリケーション間のメッセージ フローを管理するには、トピックと 2 つの異なるサブスクリプションを作成します。

Pub/Sub トピックの作成

ID hello_topic を含むトピックを作成します。

gcloud pubsub topics create hello_topic

Pub/Sub サブスクリプションの作成

2 つのサブスクリプションを作成してトピックに関連付けます。

これらのサブスクリプションは、pull サブスクリプションの一種である StreamingPull サブスクリプションです。

サブスクリプション 1

ID sub_one を含むサブスクリプションを作成し、hello_topic に添付します。

gcloud pubsub subscriptions create sub_one --topic=hello_topic

サブスクリプション 2

ID sub_two を含むサブスクリプションを作成し、hello_topic に添付します。

gcloud pubsub subscriptions create sub_two --topic=hello_topic

1 対多のシステムを構築する

パブリッシャーとサブスクライバーのコードをダウンロードする

  1. このチュートリアルで必要な Pub/Sub Python ファイルをダウンロードします。

     git clone https://github.com/googleapis/python-pubsub.git
  2. 次の手順に進む前に、開いているターミナルをすべて閉じます。

3 つのターミナルを設定する

  1. 各チュートリアル アプリケーション(1 つのパブリッシャーと 2 つのサブスクライバー)につき 1 つのターミナルを起動します。便宜上、このチュートリアルでは次のターミナルを呼び出します。

    • publisher ターミナル
    • sub_one ターミナル
    • sub_two ターミナル
  2. publisher ターミナルで、pyenv-qs という名前の Python 仮想環境を作成して有効にします。

    Bash

    python -m venv pyenv-qs
    source pyenv-qs/bin/activate

    PowerShell

    py -m venv pyenv-qs
    .\pyenv-qs\Scripts\activate

    sub_one ターミナルと sub_one ターミナルで、次のコマンドを実行します。

    Bash

    source pyenv-qs/bin/activate

    PowerShell

    .\pyenv-qs\Scripts\activate

    activate コマンドを実行すると、コマンド プロンプトに (pyenv-qs) $ という値が表示されます。

  3. publisher ターミナルで pip を使用して Pub/Sub Python クライアント ライブラリをインストールします。

    python -m pip install --upgrade google-cloud-pubsub
  4. 3 つのターミナルすべてで、現在のプロジェクト ID で環境変数を設定します。この gcloud コマンドは、選択したプロジェクト ID を判別し変数として設定します。

    Bash

    export PROJECT=`gcloud config get-value project`

    PowerShell

    $env:PROJECT=$(gcloud config get-value project)
  5. 3 つすべてのターミナルで、サンプルコードを含むプロジェクトパスに変更します。

    cd python-pubsub/samples/snippets/quickstart/

アプリケーションを起動してメッセージ フローを確認する

サブスクライバー 1 のアプリケーションを起動する

sub_one ターミナルでサブスクライバー 1 を起動します。

Bash

python sub.py $PROJECT sub_one

PowerShell

py sub.py $env:PROJECT sub_one

起動すると、このアプリケーションはサーバーとの双方向ストリーミング接続を確立します。Pub/Sub は、その接続を介してメッセージを配信します。

サブスクライバー 1 アプリケーションは sub_one サブスクリプションでメッセージのリッスンを開始します。

パブリッシャー アプリケーションを起動する

publisher ターミナルでパブリッシャー アプリケーションを起動します。

Bash

python pub.py $PROJECT hello_topic

PowerShell

py pub.py $env:PROJECT hello_topic

パブリッシャー アプリケーションが起動すると、Pub/Sub システムが次の処理を行います。

  • パブリッシャー アプリケーションが「Hello, World!」というメッセージを Pub/Sub に送信します。既存のサブスクリプションにはこれは認識されません。このサーバーはメッセージ ID の割り当ても行います。

  • サブスクライバー 1 アプリケーションが「Hello World」というメッセージを受信して出力し、確認応答を Pub/Sub に送信します。

  • パブリッシャー アプリケーションが確認応答を出力します。この確認応答により、メッセージの処理に成功したことと、このサブスクライバーや他の sub_one サブスクライバーに再送信する必要がないことが Pub/Sub に通知されます。

Pub/Sub によりメッセージが sub_one から削除されます。

パブリッシャー アプリケーションはメッセージをパブリッシュし、メッセージ ID を割り当てます。サブスクライバー 1 アプリケーションは「Hello World」メッセージを受信し、確認応答を送信します。

サブスクライバー 2 アプリケーションを起動する

sub_two ターミナルでサブスクライバー 2 を起動します。

Bash

python sub.py $PROJECT sub_two

PowerShell

py sub.py $env:PROJECT sub_two

このサブスクライバーは、sub_two サブスクリプションに配信されたメッセージを受信します。サブスクライバー 2 では sub.py スクリプトが再利用されます。ただし、サブスクライバー 2 はパブリッシャーがトピックとサブスクリプションにメッセージを送信するまで起動されません。パブリッシャーサブスクライバー 2 を直接呼び出した場合、パブリッシャー アプリケーションはサブスクライバー 2 が起動するまで待機するかタイムアウトする必要があります。Pub/Sub は、サブスクライバー 2 へのメッセージを効率的に保存して管理します。

サブスクライバー 2 がリッスンを開始し、リッスンのため待機していたメッセージを sub_two で受信します。

これで、Pub/Sub を使用して開発を行う準備が整いました。

いかがでしたか

Cloud Pub/Sub のサポートページにその他の参考資料やリンクがありますので、こちらもご利用ください。

クリーンアップ

  1. 実行中のすべてのアプリケーションを停止します。
  2. ローカル環境からサンプルコードのディレクトリを削除します。
  3. トピックを削除します。

    gcloud pubsub topics delete hello_topic
  4. サブスクリプションを削除します。

    gcloud pubsub subscriptions delete sub_one
    gcloud pubsub subscriptions delete sub_two
  5. Google Cloud コンソールの [IAM と管理] セクションでチュートリアル プロジェクトを終了します。

  6. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  7. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

次のステップ

以下のことができます。

  • GitHub でチュートリアルの pub.py コードと sub.py コードを確認し、Pub/Sub の他のサンプルを閲覧できます。演習として、1 秒間隔で現地時間をパブリッシュするバージョンの pub.py を作成します。

  • メッセージのバッチ処理の方法を学びます。

  • push サブスクリプションを使用して、App Engine エンドポイントCloud Functions をトリガーするメッセージを受信します。

  • 再生を使用して、確認応答済みのメッセージを取得します。デフォルトでは、Pub/Sub はサブスクリプションから確認応答済みのメッセージを削除します。たとえば、このチュートリアルでは sub.py を再実行して、「Hello, World!」というメッセージを再度受信することはできません。再生機能を使用すると、メッセージが確認応答された後、そのメッセージを受信するようにサブスクリプションを設定できます。

  • 他の言語のクライアント ライブラリを使用します。