주제 유형 변경

가져오기 주제를 표준 주제로 변환하거나 그 반대로 표준 주제를 가져오기 주제로 변환할 수 있습니다.

가져오기 주제를 표준 주제로 변환

가져오기 주제를 표준 주제로 변환하려면 수집 설정을 삭제합니다. 다음 단계를 수행합니다.

콘솔

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    주제로 이동

  2. 가져오기 주제를 클릭합니다.

  3. 주제 세부정보 페이지에서 수정을 클릭합니다.

  4. 수집 사용 설정 옵션을 삭제합니다.

  5. 업데이트를 클릭합니다.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. gcloud pubsub topics update 명령어를 실행합니다.

    gcloud pubsub topics update TOPIC_ID \
        --clear-ingestion-data-source-settings

    TOPIC_ID를 주제 ID로 바꿉니다.

표준 주제를 Amazon Kinesis Data Streams 가져오기 주제로 변환

표준 주제를 Amazon Kinesis Data Streams 가져오기 주제로 변환하려면 먼저 모든 기본 요건을 충족하는지 확인합니다.

콘솔

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    주제로 이동

  2. 가져오기 주제로 변환할 주제를 클릭합니다.

  3. 주제 세부정보 페이지에서 수정을 클릭합니다.

  4. 수집 사용 설정 옵션을 선택합니다.

  5. 수집 소스의 경우 Amazon Kinesis Data Streams를 선택합니다.

  6. 다음 세부정보를 입력합니다.

    • Kinesis 스트림 ARN: Pub/Sub로 수집하려는 Kinesis Data Stream의 ARN입니다. ARN 형식은 arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}입니다.

    • Kinesis 소비자 ARN: AWS Kinesis Data Stream에 등록된 소비자 리소스의 ARN입니다. ARN 형식은 arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}입니다.

    • AWS 역할 ARN: AWS 역할의 ARN입니다. 역할의 ARN 형식은 arn:aws:iam:${Account}:role/${RoleName}입니다.

    • 서비스 계정: Google Cloud에서 서비스 계정 만들기에서 만든 서비스 계정입니다.

  7. 업데이트를 클릭합니다.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 다음 샘플에 언급된 모든 플래그를 사용하여 gcloud pubsub topics update 명령어를 실행합니다.

    gcloud pubsub topics update TOPIC_ID \
         --kinesis-ingestion-stream-arn KINESIS_STREAM_ARN\
         --kinesis-ingestion-consumer-arn KINESIS_CONSUMER_ARN\
         --kinesis-ingestion-role-arn KINESIS_ROLE_ARN\
         --kinesis-ingestion-service-account PUBSUB_SERVICE_ACCOUNT
      

    다음을 바꿉니다.

    • TOPIC_ID는 주제 ID 또는 이름입니다. 이 필드는 업데이트할 수 없습니다.

    • KINESIS_STREAM_ARN은 Pub/Sub로 수집하려는 Kinesis Data Streams의 ARN입니다. ARN 형식은 arn:${Partition}:kinesis:${Region}:${Account}:stream/${StreamName}입니다.

    • KINESIS_CONSUMER_ARN은 AWS Kinesis Data Streams에 등록된 소비자 리소스의 ARN입니다. ARN 형식은 arn:${Partition}:kinesis:${Region}:${Account}:${StreamType}/${StreamName}/consumer/${ConsumerName}:${ConsumerCreationTimpstamp}입니다.

    • KINESIS_ROLE_ARN은 AWS 역할의 ARN입니다. 역할의 ARN 형식은 arn:aws:iam:${Account}:role/${RoleName}입니다.

    • PUBSUB_SERVICE_ACCOUNTGoogle Cloud에서 서비스 계정 만들기에서 만든 서비스 계정입니다.

Go

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Go 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Go API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func updateTopicType(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	streamARN := "stream-arn"
	consumerARN := "consumer-arn"
	awsRoleARN := "aws-role-arn"
	gcpServiceAccount := "gcp-service-account"

	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	updateCfg := pubsub.TopicConfigToUpdate{
		// If wanting to clear ingestion settings, set this to zero value: &pubsub.IngestionDataSourceSettings{}
		IngestionDataSourceSettings: &pubsub.IngestionDataSourceSettings{
			Source: &pubsub.IngestionDataSourceAWSKinesis{
				StreamARN:         streamARN,
				ConsumerARN:       consumerARN,
				AWSRoleARN:        awsRoleARN,
				GCPServiceAccount: gcpServiceAccount,
			},
		},
	}
	topicCfg, err := client.Topic(topicID).Update(ctx, updateCfg)
	if err != nil {
		return fmt.Errorf("topic.Update: %w", err)
	}
	fmt.Fprintf(w, "Topic updated with kinesis source: %v\n", topicCfg)
	return nil
}

Java

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Java 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Java API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.protobuf.FieldMask;
import com.google.pubsub.v1.IngestionDataSourceSettings;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import com.google.pubsub.v1.UpdateTopicRequest;
import java.io.IOException;

public class UpdateTopicTypeExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";
    // Kinesis ingestion settings.
    String streamArn = "stream-arn";
    String consumerArn = "consumer-arn";
    String awsRoleArn = "aws-role-arn";
    String gcpServiceAccount = "gcp-service-account";

    UpdateTopicTypeExample.updateTopicTypeExample(
        projectId, topicId, streamArn, consumerArn, awsRoleArn, gcpServiceAccount);
  }

  public static void updateTopicTypeExample(
      String projectId,
      String topicId,
      String streamArn,
      String consumerArn,
      String awsRoleArn,
      String gcpServiceAccount)
      throws IOException {
    try (TopicAdminClient topicAdminClient = TopicAdminClient.create()) {
      TopicName topicName = TopicName.of(projectId, topicId);

      IngestionDataSourceSettings.AwsKinesis awsKinesis =
          IngestionDataSourceSettings.AwsKinesis.newBuilder()
              .setStreamArn(streamArn)
              .setConsumerArn(consumerArn)
              .setAwsRoleArn(awsRoleArn)
              .setGcpServiceAccount(gcpServiceAccount)
              .build();
      IngestionDataSourceSettings ingestionDataSourceSettings =
          IngestionDataSourceSettings.newBuilder().setAwsKinesis(awsKinesis).build();

      // Construct the topic with Kinesis ingestion settings.
      Topic topic =
          Topic.newBuilder()
              .setName(topicName.toString())
              .setIngestionDataSourceSettings(ingestionDataSourceSettings)
              .build();

      // Construct a field mask to indicate which field to update in the topic.
      FieldMask updateMask =
          FieldMask.newBuilder().addPaths("ingestion_data_source_settings").build();

      UpdateTopicRequest request =
          UpdateTopicRequest.newBuilder().setTopic(topic).setUpdateMask(updateMask).build();

      Topic response = topicAdminClient.updateTopic(request);

      System.out.println(
          "Updated topic with Kinesis ingestion settings: " + response.getAllFields());
    }
  }
}

Node.js

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId,
  awsRoleArn,
  gcpServiceAccount,
  streamArn,
  consumerArn
) {
  const metadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

Python

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용Python 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Python API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

from google.cloud import pubsub_v1
from google.pubsub_v1.types import Topic
from google.pubsub_v1.types import IngestionDataSourceSettings
from google.pubsub_v1.types import UpdateTopicRequest
from google.protobuf import field_mask_pb2

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"
# stream_arn = "your-stream-arn"
# consumer_arn = "your-consumer-arn"
# aws_role_arn = "your-aws-role-arn"
# gcp_service_account = "your-gcp-service-account"

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

update_request = UpdateTopicRequest(
    topic=Topic(
        name=topic_path,
        ingestion_data_source_settings=IngestionDataSourceSettings(
            aws_kinesis=IngestionDataSourceSettings.AwsKinesis(
                stream_arn=stream_arn,
                consumer_arn=consumer_arn,
                aws_role_arn=aws_role_arn,
                gcp_service_account=gcp_service_account,
            )
        ),
    ),
    update_mask=field_mask_pb2.FieldMask(paths=["ingestion_data_source_settings"]),
)

topic = publisher.update_topic(request=update_request)
print(f"Updated topic: {topic.name} with AWS Kinesis Ingestion Settings")

C++

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용C++ 설정 안내를 따르세요. 자세한 내용은 Pub/Sub C++ API 참고 문서를 확인하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

namespace pubsub = ::google::cloud::pubsub;
namespace pubsub_admin = ::google::cloud::pubsub_admin;
[](pubsub_admin::TopicAdminClient client, std::string project_id,
   std::string topic_id, std::string stream_arn, std::string consumer_arn,
   std::string aws_role_arn, std::string gcp_service_account) {
  google::pubsub::v1::UpdateTopicRequest request;

  request.mutable_topic()->set_name(
      pubsub::Topic(std::move(project_id), std::move(topic_id)).FullName());
  auto* aws_kinesis = request.mutable_topic()
                          ->mutable_ingestion_data_source_settings()
                          ->mutable_aws_kinesis();
  aws_kinesis->set_stream_arn(stream_arn);
  aws_kinesis->set_consumer_arn(consumer_arn);
  aws_kinesis->set_aws_role_arn(aws_role_arn);
  aws_kinesis->set_gcp_service_account(gcp_service_account);
  *request.mutable_update_mask()->add_paths() =
      "ingestion_data_source_settings";

  auto topic = client.UpdateTopic(request);
  if (!topic) throw std::move(topic).status();

  std::cout << "The topic was successfully updated: " << topic->DebugString()
            << "\n";
}

Node.js (TypeScript)

이 샘플을 사용해 보기 전에 Pub/Sub 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Pub/Sub Node.js API 참고 문서를 참조하세요.

Pub/Sub에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';
// const awsRoleArn = 'arn:aws:iam:...';
// const gcpServiceAccount = 'ingestion-account@...';
// const streamArn = 'arn:aws:kinesis:...';
// const consumerArn = 'arn:aws:kinesis:...';

// Imports the Google Cloud client library
import {PubSub, TopicMetadata} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function updateTopicIngestionType(
  topicNameOrId: string,
  awsRoleArn: string,
  gcpServiceAccount: string,
  streamArn: string,
  consumerArn: string
) {
  const metadata: TopicMetadata = {
    ingestionDataSourceSettings: {
      awsKinesis: {
        awsRoleArn,
        gcpServiceAccount,
        streamArn,
        consumerArn,
      },
    },
  };

  await pubSubClient.topic(topicNameOrId).setMetadata(metadata);

  console.log('Topic updated with Kinesis source successfully.');
}

ARN에 관한 자세한 내용은 Amazon 리소스 이름(ARN)IAM 식별자를 참조하세요.

표준 주제를 Cloud Storage 가져오기 주제로 변환

표준 주제를 Cloud Storage 가져오기 주제로 변환하려면 먼저 모든 기본 요건을 충족하는지 확인합니다.

콘솔

  1. Google Cloud 콘솔에서 IAM 페이지로 이동합니다.

    주제로 이동

  2. Cloud Storage 가져오기 주제로 변환할 주제를 클릭합니다.

  3. 주제 세부정보 페이지에서 수정을 클릭합니다.

  4. 수집 사용 설정 옵션을 선택합니다.

  5. 처리 소스로 Google Cloud Storage를 선택합니다.

  6. Cloud Storage 버킷에서 찾아보기를 클릭합니다.

    버킷 선택 페이지가 열립니다. 다음 옵션 중 하나를 선택합니다.

    • 적절한 프로젝트에서 기존 버킷을 선택합니다.

    • 만들기 아이콘을 클릭하고 화면에 표시된 안내에 따라 새 버킷을 만듭니다. 버킷을 만든 후 Cloud Storage 가져오기 주제의 버킷을 선택합니다.

  7. 버킷을 지정하면 Pub/Sub에서 Pub/Sub 서비스 계정의 버킷에 적절한 권한이 있는지 확인합니다. 권한 문제가 있으면 권한과 관련된 오류 메시지가 표시됩니다.

    권한 문제가 발생하면 권한 설정을 클릭합니다. 자세한 내용은 Pub/Sub 서비스 계정에 Cloud Storage 권한 부여를 참고하세요.

  8. 객체 형식에서 텍스트, Avro 또는 Pub/Sub 아브로를 선택합니다.

    텍스트를 선택하는 경우 원하는 경우 객체를 메시지로 분할하는 구분자를 지정할 수 있습니다.

    이러한 옵션에 대한 자세한 내용은 입력 형식을 참고하세요.

  9. 선택사항입니다. 주제의 최소 객체 생성 시간을 지정할 수 있습니다. 이 속성을 설정하면 최소 객체 생성 시간 이후에 생성된 객체만 처리됩니다.

    자세한 내용은 최소 객체 생성 시간을 참고하세요.

  10. glob 패턴을 지정해야 합니다. 버킷의 모든 객체를 수집하려면 **를 glob 패턴으로 사용하세요. 지정된 패턴과 일치하는 객체만 처리됩니다.

    자세한 내용은 glob 패턴 일치를 참고하세요.

  11. 다른 기본 설정은 유지합니다.
  12. 주제 업데이트를 클릭합니다.

gcloud

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 가져오기 주제의 설정이 손실되지 않도록 주제를 업데이트할 때마다 모든 설정을 포함해야 합니다. 누락된 항목이 있으면 Pub/Sub에서 설정을 원래 기본값으로 재설정합니다.

    다음 샘플에 언급된 모든 플래그를 사용하여 gcloud pubsub topics update 명령어를 실행합니다.

    gcloud pubsub topics update TOPIC_ID \
        --cloud-storage-ingestion-bucket=BUCKET_NAME\
        --cloud-storage-ingestion-input-format=INPUT_FORMAT\
        --cloud-storage-ingestion-text-delimiter=TEXT_DELIMITER\
        --cloud-storage-ingestion-minimum-object-create-time=MINIMUM_OBJECT_CREATE_TIME\
        --cloud-storage-ingestion-match-glob=MATCH_GLOB

    다음을 바꿉니다.

    • TOPIC_ID는 주제 ID 또는 이름입니다. 이 필드는 업데이트할 수 없습니다.

    • BUCKET_NAME: 기존 버킷의 이름을 지정합니다. 예를 들면 prod_bucket입니다. 버킷 이름에 프로젝트 ID를 포함해서는 안 됩니다. 버킷을 만들려면 버킷 만들기를 참조하세요.

    • INPUT_FORMAT: 처리되는 객체의 형식을 지정합니다. 이 값은 text, avro 또는 pubsub_avro입니다. 이러한 옵션에 대한 자세한 내용은 입력 형식을 참고하세요.

    • TEXT_DELIMITER: 텍스트 객체를 Pub/Sub 메시지로 분할하는 구분자를 지정합니다. 단일 문자여야 하며 INPUT_FORMATtext인 경우에만 설정해야 합니다. 기본값은 줄바꿈 문자 (\n)입니다.

      gcloud CLI를 사용하여 구분자를 지정할 때는 새 줄 \n과 같은 특수 문자의 처리에 각별히 주의하세요. 구분자가 올바르게 해석되도록 '\n' 형식을 사용합니다. 따옴표나 이스케이프 없이 \n를 사용하면 구분자가 "n"이 됩니다.

    • MINIMUM_OBJECT_CREATE_TIME: 객체를 처리하기 위해 객체가 생성된 최소 시간을 지정합니다. UTC 기준 YYYY-MM-DDThh:mm:ssZ 형식이어야 합니다. 예를 들면 2024-10-14T08:30:30Z입니다.

      0001-01-01T00:00:00Z9999-12-31T23:59:59Z 사이의 모든 날짜(과거 또는 미래)가 유효합니다.

    • MATCH_GLOB: 객체를 처리하기 위해 일치시킬 glob 패턴을 지정합니다. gcloud CLI를 사용하는 경우 * 문자가 포함된 일치 글러브에는 * 문자가 \*\*.txt 형식으로 이스케이프 처리된 형식이거나 전체 일치 글러브가 따옴표 "**.txt" 또는 '**.txt'로 묶여 있어야 합니다. glob 패턴에 지원되는 문법에 관한 자세한 내용은 Cloud Storage 문서를 참고하세요.