Lite 구독에서 메시지 수신

이 페이지에서는 라이트 구독에서 메시지를 수신하는 방법을 설명합니다. 자바용 Pub/Sub 라이트 클라이언트 라이브러리를 사용하여 메시지를 수신할 수 있습니다.

라이트 구독은 라이트 주제를 구독자 애플리케이션에 연결합니다. 구독자는 라이트 구독에서 메시지를 수신합니다. 구독자는 라이트 구독을 만들기 전에 게시자가 보내는 메시지를 포함하여 게시자 애플리케이션이 라이트 주제로 보내는 모든 메시지를 수신합니다.

라이트 구독에서 메시지를 수신하기 전에 라이트 주제를 만들고 라이트 주제에 대한 라이트 구독을 만들고 라이트 주제에 메시지를 게시합니다.

메시지 수신

라이트 구독에서 메시지를 수신하려면 라이트 구독에서 메시지를 요청합니다. 클라이언트 라이브러리는 라이트 구독에 연결된 라이트 주제의 파티션에 자동으로 연결됩니다. 둘 이상의 구독자 클라이언트가 인스턴스화되면 메시지가 모든 클라이언트에 배포됩니다. 주제의 파티션 수에 따라 구독에 동시에 연결할 수 있는 최대 구독자 클라이언트 수가 결정됩니다.

구독자가 메시지를 초기화하고 수신을 시작하려면 최대 1분까지 걸릴 수 있습니다. 초기화 후에는 최소한의 지연 시간으로 메시지가 수신됩니다.

다음 샘플은 라이트 구독에서 메시지를 수신하는 방법을 보여줍니다.

gcloud

이 명령어를 사용하려면 Python 3.6 이상이 필요하며 grpcio Python 패키지를 설치해야 합니다. MacOS, Linux, Cloud Shell 사용자의 경우 다음을 실행합니다

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

메시지를 수신하려면 gcloud pubsub lite-subscriptions subscribe 명령어를 사용합니다.

gcloud pubsub lite-subscriptions subscribe SUBSCRIPTION_ID \
    --location=LITE_LOCATION \
    --auto-ack

다음을 바꿉니다.

  • SUBSCRIPTION_ID: 라이트 구독의 ID
  • LITE_LOCATION: 라이트 구독의 위치

Go

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 Go 설정 안내를 따르세요.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing
	// published messages when running this sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	subscriptionID := flag.String("subscription_id", "", "Existing Pub/Sub Lite subscription")
	timeout := flag.Duration("timeout", 90*time.Second, "The duration to receive messages")
	flag.Parse()

	ctx := context.Background()
	subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)

	// Configure flow control settings. These settings apply per partition.
	// The message stream is paused based on the maximum size or number of
	// messages that the subscriber has already received, whichever condition is
	// met first.
	settings := pscompat.ReceiveSettings{
		// 10 MiB. Must be greater than the allowed size of the largest message
		// (1 MiB).
		MaxOutstandingBytes: 10 * 1024 * 1024,
		// 1,000 outstanding messages. Must be > 0.
		MaxOutstandingMessages: 1000,
	}

	// Create the subscriber client.
	subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
	if err != nil {
		log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
	}

	// Listen for messages until the timeout expires.
	log.Printf("Listening to messages on %s for %v...\n", subscriptionPath, *timeout)
	cctx, cancel := context.WithTimeout(ctx, *timeout)
	defer cancel()
	var receiveCount int32

	// Receive blocks until the context is cancelled or an error occurs.
	if err := subscriber.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		// NOTE: May be called concurrently; synchronize access to shared memory.
		atomic.AddInt32(&receiveCount, 1)

		// Metadata decoded from the message ID contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(msg.ID)
		if err != nil {
			log.Fatalf("Failed to parse %q: %v", msg.ID, err)
		}

		fmt.Printf("Received (partition=%d, offset=%d): %s\n", metadata.Partition, metadata.Offset, string(msg.Data))
		msg.Ack()
	}); err != nil {
		log.Fatalf("SubscriberClient.Receive error: %v", err)
	}

	fmt.Printf("Received %d messages\n", receiveCount)
}

자바

이 샘플을 실행하기 전에 Pub/Sub 라이트 클라이언트 라이브러리의 자바 설정 안내를 따르세요.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, regional);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId, boolean regional)
      throws ApiException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + MessageMetadata.decode(message.getMessageId()));
          System.out.println("Data : " + message.getData().toStringUtf8());
          System.out.println("Ordering key : " + message.getOrderingKey());
          System.out.println("Attributes : ");
          message
              .getAttributesMap()
              .forEach(
                  (key, value) -> {
                    if (key == MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO) {
                      Timestamp ts = MessageTransforms.decodeAttributeEventTime(value);
                      System.out.println(key + " = " + ts.toString());
                    } else {
                      System.out.println(key + " = " + value);
                    }
                  });

          // Acknowledge the message.
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Python

이 샘플을 실행하기 전에 Pub/Sub Lite 클라이언트 라이브러리의 Python 설정 안내를 따르세요.

from concurrent.futures._base import TimeoutError
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    MessageMetadata,
    SubscriptionPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

클라이언트 라이브러리는 라이트 주제의 각 파티션에 대한 양방향 스트리밍 연결을 설정합니다.

  1. 구독자가 파티션에 연결을 요청합니다.

  2. Pub/Sub 라이트 서비스가 구독자에게 메시지를 전달합니다.

구독자는 메시지를 처리한 후 메시지를 확인해야 합니다. 클라이언트 라이브러리는 콜백에서 비동기식으로 메시지를 처리하고 확인합니다. 구독자가 메모리에 저장할 수 있는 확인되지 않은 메시지 수를 제한하려면 흐름 제어 설정을 구성합니다.

여러 구독자가 동일한 라이트 구독에서 메시지를 수신하는 경우 Pub/Sub 라이트 서비스는 각 구독자를 동일한 비율로 파티션에 연결합니다. 예를 들어 2명의 구독자가 동일한 라이트 구독을 사용하고 라이트 구독이 두 개의 파티션이 있는 라이트 주제에 연결된 경우 각 구독자는 파티션 중 하나에서 메시지를 수신합니다.

메시지 확인

메시지를 확인하려면 라이트 구독에 확인을 전송합니다.

Go

확인을 전송하려면 Message.Ack() 메서드를 사용합니다.

자바

확인을 전송하려면 AckReplyConsumer.ack() 메서드를 사용합니다.

Python

확인을 전송하려면 Message.ack() 메서드를 사용합니다.

구독자는 모든 메시지를 확인해야 합니다. 구독자는 가장 오래된 미확인 메시지를 먼저 수신한 다음 각 후속 메시지를 수신합니다. 구독자가 하나의 메시지를 건너뛰고 후속 메시지를 확인한 다음 다시 연결하는 경우 구독자는 미확인 메시지와 각 후속 확인 메시지를 수신합니다.

라이트 구독에는 확인 기한이 없으며 Pub/Sub 라이트 서비스는 공개 스트리밍 연결을 통해 확인되지 않은 메시지를 다시 전송하지 않습니다.

흐름 제어 사용

Pub/Sub 라이트 서비스가 구독자에게 메시지를 보내면 구독자는 미확인 메시지를 메모리에 저장합니다. 구독자는 흐름 제어 설정을 사용하여 메모리에 저장할 수 있는 대기 중인 메시지 수를 제한할 수 있습니다. 흐름 제어 설정은 구독자가 메시지를 수신하는 각 파티션에 적용됩니다.

다음과 같은 흐름 제어 설정을 구성할 수 있습니다.

  • 대기 중인 메시지 크기. 대기 중인 메시지의 최대 크기(바이트)입니다. 최대 크기는 가장 큰 메시지의 크기보다 커야 합니다.
  • 메시지 수. 대기 중인 메시지의 최대 개수입니다.

메시지 크기는 size_bytes 필드에 있습니다. 클라이언트 라이브러리를 사용하여 흐름 제어 설정을 구성할 수 있습니다.

Go

흐름 제어 설정을 구성하려면 pscompat.NewSubscriberClientWithSettings를 호출할 때 ReceiveSettings를 전달합니다. ReceiveSettings에서 다음 매개변수를 설정할 수 있습니다.

  • MaxOutstandingMessages

  • MaxOutstandingBytes

예시를 보려면 이 흐름 제어 샘플을 참조하세요.

자바

흐름 제어 설정을 구성하려면 FlowControlRequest.Builder 클래스에서 다음 메서드를 사용합니다.

Python

흐름 제어 설정을 구성하려면 FlowControlSettings 클래스에서 다음 매개변수를 설정합니다.

  • bytes_outstanding

  • messages_outstanding

예를 들어 최대 메시지 수가 100이고 구독자가 10개의 파티션에 연결된 경우 구독자는 10개의 파티션에서 100개를 초과하여 메시지를 수신할 수 없습니다. 대기 중인 메시지 총 개수는 100개를 초과할 수 있지만 구독자는 각 파티션에 100개를 초과하는 메시지를 저장할 수 없습니다.