Lite サブスクリプションからメッセージを受信する

このページでは、Lite サブスクリプションからメッセージを受信する方法について説明します。メッセージは、Java 用の Pub/Sub Lite クライアント ライブラリを使用して受信できます。

Lite サブスクリプションでは、Lite トピックがサブスクライバー アプリケーションに接続され、サブスクライバーは、Lite サブスクリプションからメッセージを受信します。サブスクライバーは、パブリッシャー アプリケーションが Lite トピックに送信するメッセージをすべて受信します。これには Lite サブスクリプションを作成する前にパブリッシャーが送信したメッセージも含まれます。

Lite サブスクリプションからメッセージを受信する前に、Lite トピックを作成し、その Lite トピックに Lite サブスクリプションを作成して、メッセージをパブリッシュします。

メッセージの受信

Lite サブスクリプションからメッセージを受信するには、Lite サブスクリプションからのメッセージをリクエストします。クライアント ライブラリは、Lite サブスクリプションに関連付けられた Lite トピックのパーティションに自動的に接続します。 複数のサブスクライバー クライアントがインスタンス化されると、メッセージはすべてのクライアントに分散されます。トピック内のパーティションの数によって、サブスクリプションに同時接続できるサブスクライバー クライアントの最大数が決まります。

サブスクライバーが初期化してメッセージの受信を開始するまでに最大で 1 分程度かかることがあります。初期化後は、最小限のレイテンシでメッセージを受信します。

次の例は、Lite サブスクリプションからメッセージを受信する方法を示しています。

gcloud

このコマンドを使用するには、Python 3.6 以降と grpcio Python パッケージのインストールが必要です。MacOS、Linux、Cloud Shell のユーザーの場合、以下を実行します。

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

メッセージを受信するには、gcloud pubsub lite-subscriptions subscribe コマンドを使用します。

gcloud pubsub lite-subscriptions subscribe SUBSCRIPTION_ID \
    --location=LITE_LOCATION \
    --auto-ack

以下を置き換えます。

  • SUBSCRIPTION_ID: Lite サブスクリプションの ID
  • LITE_LOCATION: Lite サブスクリプションのロケーション

Go

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Go の設定手順を実施してください。


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing
	// published messages when running this sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	subscriptionID := flag.String("subscription_id", "", "Existing Pub/Sub Lite subscription")
	timeout := flag.Duration("timeout", 90*time.Second, "The duration to receive messages")
	flag.Parse()

	ctx := context.Background()
	subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)

	// Configure flow control settings. These settings apply per partition.
	// The message stream is paused based on the maximum size or number of
	// messages that the subscriber has already received, whichever condition is
	// met first.
	settings := pscompat.ReceiveSettings{
		// 10 MiB. Must be greater than the allowed size of the largest message
		// (1 MiB).
		MaxOutstandingBytes: 10 * 1024 * 1024,
		// 1,000 outstanding messages. Must be > 0.
		MaxOutstandingMessages: 1000,
	}

	// Create the subscriber client.
	subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
	if err != nil {
		log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
	}

	// Listen for messages until the timeout expires.
	log.Printf("Listening to messages on %s for %v...\n", subscriptionPath, *timeout)
	cctx, cancel := context.WithTimeout(ctx, *timeout)
	defer cancel()
	var receiveCount int32

	// Receive blocks until the context is cancelled or an error occurs.
	if err := subscriber.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		// NOTE: May be called concurrently; synchronize access to shared memory.
		atomic.AddInt32(&receiveCount, 1)

		// Metadata decoded from the message ID contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(msg.ID)
		if err != nil {
			log.Fatalf("Failed to parse %q: %v", msg.ID, err)
		}

		fmt.Printf("Received (partition=%d, offset=%d): %s\n", metadata.Partition, metadata.Offset, string(msg.Data))
		msg.Ack()
	}); err != nil {
		log.Fatalf("SubscriberClient.Receive error: %v", err)
	}

	fmt.Printf("Received %d messages\n", receiveCount)
}

Java

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, regional);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId, boolean regional)
      throws ApiException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + MessageMetadata.decode(message.getMessageId()));
          System.out.println("Data : " + message.getData().toStringUtf8());
          System.out.println("Ordering key : " + message.getOrderingKey());
          System.out.println("Attributes : ");
          message
              .getAttributesMap()
              .forEach(
                  (key, value) -> {
                    if (key == MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO) {
                      Timestamp ts = MessageTransforms.decodeAttributeEventTime(value);
                      System.out.println(key + " = " + ts.toString());
                    } else {
                      System.out.println(key + " = " + value);
                    }
                  });

          // Acknowledge the message.
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Python

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。

from concurrent.futures._base import TimeoutError
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    MessageMetadata,
    SubscriptionPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

クライアント ライブラリは、Lite トピックの各パーティションへの双方向ストリーミング接続を確立します。

  1. サブスクライバーはパーティションへの接続をリクエストします。

  2. Pub/Sub Lite サービスは、サブスクライバーにメッセージを配信します。

サブスクライバーはメッセージを処理した後、メッセージに対する確認応答を行う必要があります。クライアント ライブラリは、コールバック内で非同期にメッセージを処理して確認応答を行います。サブスクライバーがメモリに格納できる確認応答されていないメッセージの数を制限するには、フロー制御の設定を構成します

複数のサブスクライバーが同じ Lite サブスクリプションからメッセージを受信した場合、Pub/Sub Lite サービスは、各サブスクライバーを同じ比率でパーティションに接続します。たとえば、2 つのサブスクライバーが同じ Lite サブスクリプションを使用し、2 つのパーティションを持つ Lite トピックに接続した場合、各サブスクライバーがいずれかのパーティションからメッセージを受信します。

メッセージの確認応答

メッセージを確認応答するには、Lite サブスクリプションに確認応答を送信します。

Go

確認応答を送信するには、Message.Ack() メソッドを使用します。

Java

確認応答を送信するには、AckReplyConsumer.ack() メソッドを使用します。

Python

確認応答を送信するには、Message.ack() メソッドを使用します。

サブスクライバーは、各メッセージの確認応答を行う必要があります。サブスクライバーは、最も古い未確認のメッセージを最初に受け取り、その後に後続の各メッセージを受け取ります。サブスクライバーが 1 つのメッセージをスキップして後続のメッセージを確認応答してから再接続すると、サブスクライバーは未確認メッセージと、それぞれの後続の確認応答メッセージを受信します。

Lite サブスクリプションには確認応答期限がなく、Pub/Sub Lite サービスは未確認のメッセージをオープン ストリーミング接続で再配信しません。

フロー制御の使用

Pub/Sub Lite サービスがサブスクライバーにメッセージを配信した後、サブスクライバーは確認応答されていないメッセージをメモリに保存します。フロー制御設定を使用して、サブスクライバーがメモリに格納できる未処理のメッセージの数を制限できます。フロー制御設定は、サブスクライバーがメッセージを受信する各パーティションに適用されます。

次のフロー制御設定を構成できます。

  • 未処理のメッセージ サイズ。未処理のメッセージの最大サイズ(バイト単位)。この最大サイズは、いちばん大きなメッセージのサイズより大きい必要があります。
  • メッセージの数。未処理のメッセージの最大数です。

メッセージのサイズは size_bytes フィールドに設定されてます。フロー制御設定はクライアント ライブラリで構成できます。

Go

フロー制御設定を構成するには、pscompat.NewSubscriberClientWithSettings を呼び出すときに ReceiveSettings を渡します。ReceiveSettings で次のパラメータを設定できます。

  • MaxOutstandingMessages

  • MaxOutstandingBytes

例については、こちらのフロー制御サンプルをご覧ください。

Java

フロー制御設定を構成するには、FlowControlRequest.Builder クラスの次のメソッドを使用します。

Python

フロー制御設定を構成するには、FlowControlSettings クラスに次のパラメータを設定します。

  • bytes_outstanding

  • messages_outstanding

たとえば、メッセージの最大数が 100 件で、サブスクライバーが 10 個のパーティションに接続する場合、サブスクライバーは 10 個のパーティションから 100 件を超えるメッセージを受信できません。未処理のメッセージの合計数が 100 件を超えることがありますが、サブスクライバーは各パーティションから 100 件を超えるメッセージを保存できません。