동시 실행 제어

이 문서에서는 주제에 게시된 메시지에서 동시 실행 제어를 사용하는 방법에 대해 설명합니다.

동시 실행 제어는 클라이언트 라이브러리 게시 메시지에 사용되는 백그라운드(I/O) 스레드의 기본 개수를 재정의하는 데 도움이 됩니다. 이렇게 하면 게시자 클라이언트가 병렬로 메시지를 전송할 수 있습니다.

동시 실행 제어는 Pub/Sub 상위 수준 클라이언트 라이브러리에서 사용할 수 있는 기능입니다. 하위 수준 라이브러리를 사용할 때도 자체 동시 실행 제어를 구현할 수 있습니다.

동시 실행 제어 지원은 클라이언트 라이브러리의 프로그래밍 언어에 따라 달라집니다. C++, Go, Java와 같은 동시 스레드를 지원하는 언어 구현의 경우 클라이언트 라이브러리가 스레드 수를 기본으로 선택합니다.

이 페이지에서는 동시 실행 제어의 개념과 게시자 클라이언트에 이 기능을 설정하는 방법을 설명합니다. 동시 실행 제어를 위해 구독자 클라이언트를 구성하려면 동시 실행 제어를 사용하여 추가 메시지 처리를 참고하세요.

시작하기 전에

게시 워크플로를 구성하기 전에 다음 작업이 완료되어야 합니다.

필요한 역할

주제에 메시지를 게시하는 데 필요한 권한을 얻으려면 관리자에게 주제에 대한 Pub/Sub 게시자(roles/pubsub.publisher) IAM 역할을 부여해 달라고 요청하세요. 역할 부여에 대한 자세한 내용은 프로젝트, 폴더, 조직에 대한 액세스 관리를 참조하세요.

커스텀 역할이나 다른 사전 정의된 역할을 통해 필요한 권한을 얻을 수도 있습니다.

주제 및 구독을 만들거나 업데이트하려면 추가 권한이 필요합니다.

동시 실행 제어 구성

동시 실행 제어 변수의 기본값과 변수 이름은 클라이언트 라이브러리마다 다를 수 있습니다. 예를 들어 Java 클라이언트 라이브러리에서 동시 실행 제어를 구성하는 메서드는 setExecutorProvider()setChannelProvider()입니다. 자세한 내용은 API 참조 문서를 참고하세요.

  • setExecutorProvider()를 사용하면 게시 응답 처리에 사용되는 실행자 제공업체를 맞춤설정할 수 있습니다. 예를 들어 실행자 제공업체를 여러 게시자 클라이언트에서 스레드 수가 제한된 단일 공유 실행자를 반환하는 실행자 제공업체로 변경할 수 있습니다. 이 구성은 생성되는 스레드 수를 제한하는 데 도움이 됩니다.

  • setChannelProvider()를 사용하면 Pub/Sub에 대한 연결을 여는 데 사용되는 채널 제공업체를 맞춤설정할 수 있습니다. 일반적으로 여러 게시자 클라이언트에서 동일한 채널을 사용하지 않는 한 이 값을 구성하지 않습니다. 너무 많은 클라이언트에서 채널을 재사용하면 GOAWAY 또는 ENHANCE_YOUR_CALM 오류가 발생할 수 있습니다. 애플리케이션 로그 또는 Cloud Logging에 이러한 오류가 표시되면 채널을 더 만듭니다.

동시 실행 제어용 코드 샘플

C++

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 참조하세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.PublishSettings.NumGoroutines = 1

	result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

자바

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub 자바 API 참조 문서를 참조하세요.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

이 샘플을 시도하기 전에 빠른 시작: 클라이언트 라이브러리 사용의 Ruby 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Ruby API 참고 문서를 참조하세요.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

다음 단계