Kafka Connect Bigtable 싱크 커넥터


싱크 커넥터는 Kafka Connect 프레임워크용 플러그인으로, Kafka에서 다른 시스템으로 데이터를 직접 스트리밍하여 저장하고 처리하는 데 사용할 수 있습니다. Kafka Connect Bigtable 싱크는 지연 시간을 최소화하여 Bigtable로 데이터를 실시간 스트리밍하도록 설계된 전용 커넥터입니다.

이 페이지에서는 커넥터의 기능과 제한사항을 설명합니다. 또한 단일 메시지 변환 (SMT) 및 자동 테이블 생성과 같은 고급 시나리오의 사용 예도 제공합니다. 설치 안내 및 전체 참조 문서는 Kafka Connect Bigtable 싱크 커넥터 저장소를 참고하세요.

기능

Bigtable 싱크 커넥터는 Kafka 주제를 구독하고, 이러한 주제에서 수신된 메시지를 읽은 다음, 데이터를 Bigtable 테이블에 씁니다. 다음 섹션에서는 각 기능에 대해 간략하게 설명합니다. 사용에 관한 자세한 내용은 이 문서의 구성 섹션을 참고하세요.

키 매핑, SMT, 변환기

Bigtable 테이블에 데이터를 쓰려면 각 작업에 대해 고유한 행 키, 열 패밀리, 열 이름을 제공해야 합니다. 이 정보는 Kafka 메시지의 필드에서 추론됩니다. row.key.definition, row.key.delimiter 또는 default.column.family과 같은 설정으로 필요한 모든 식별자를 구성할 수 있습니다.

자동 테이블 생성

auto.create.tablesauto.create.column.families 설정을 사용하여 Bigtable 대상에 대상 테이블과 column family가 없는 경우 자동으로 만들 수 있습니다. 이러한 유연성에는 특정 성능 비용이 따르므로 일반적으로 데이터를 스트리밍하려는 테이블을 먼저 만드는 것이 좋습니다.

쓰기 모드 및 행 삭제

테이블에 쓸 때 행이 이미 있는 경우 데이터를 완전히 덮어쓰거나 insert.mode 설정을 사용하여 작업을 중단할 수 있습니다. 이 설정을 DLQ 오류 처리와 함께 활용하여 최소 한 번 전송을 보장할 수 있습니다.

DELETE 명령어를 실행하려면 value.null.mode 속성을 구성합니다. 전체 행, 열 그룹 또는 개별 열을 삭제하는 데 사용할 수 있습니다.

데드 레터 큐

errors.deadletterqueue.topic.name 속성을 구성하고 errors.tolerance=all를 설정하여 처리되지 않은 메시지를 DLQ 주제에 게시합니다.

Confluent Platform Bigtable 싱크 커넥터와의 호환성

Google Cloud의 Bigtable Kafka Connect 싱크 커넥터는 Google Cloud 자체 관리형 Confluent Platform Bigtable 싱크 커넥터와 완전히 동일합니다. connector.class 설정을 connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector로 조정하여 Confluent Platform 커넥터의 기존 구성 파일을 사용할 수 있습니다.

제한사항

다음과 같은 제한사항이 적용됩니다.

  • Kafka Connect Bigtable 싱크 커넥터는 현재 커넥터를 독립적으로 설치할 수 있는 Kafka 클러스터(자체 관리 또는 온프레미스 Kafka 클러스터)에서만 지원됩니다. 이 커넥터는 현재 Apache Kafka용 Google Cloud 관리형 서비스에서 지원되지 않습니다.

  • 이 커넥터는 최대 두 개의 중첩 수준에서 필드 이름으로 column family와 열을 만들 수 있습니다.

    • 2단계보다 깊이 중첩된 구조체는 JSON로 변환되어 상위 열에 저장됩니다.
    • 루트 수준 구조체는 column family로 변환됩니다. 이러한 구조체의 필드는 열 이름이 됩니다.
    • 루트 수준 기본값은 기본적으로 Kafka 주제를 이름으로 사용하는 column family에 저장됩니다. 이 패밀리의 열 이름은 필드 이름과 같습니다. default.column.familydefault.column.qualifier 설정을 사용하여 이 동작을 수정할 수 있습니다.

설치

이 커넥터를 설치하려면 표준 설치 단계를 따르세요. Maven으로 프로젝트를 빌드하고, .jar 파일을 Kafka Connect 플러그인 디렉터리에 복사하고, 구성 파일을 만듭니다. 단계별 안내는 저장소의 커넥터 실행 섹션을 참고하세요.

구성

Kafka Connect 커넥터를 구성하려면 구성 파일을 작성해야 합니다. Google Cloud의 Bigtable Kafka Connect 싱크 커넥터는 모든 기본 Kafka 커넥터 속성과 Bigtable 테이블 작업을 위해 맞춤설정된 일부 추가 필드를 지원합니다.

다음 섹션에서는 고급 사용 사례에 관한 자세한 예를 제공하지만 사용 가능한 모든 설정을 설명하지는 않습니다. 기본 사용 예시와 전체 속성 참조는 Kafka Connect Bigtable 싱크 커넥터 저장소를 참고하세요.

예: 유연한 행 키 및 열 패밀리 생성

샘플 시나리오

수신 Kafka 메시지에는 사용자 식별자가 포함된 쇼핑 주문의 세부정보가 포함됩니다. 각 주문을 두 개의 열 패밀리가 있는 행에 작성하려고 합니다. 하나는 사용자 세부정보용이고 다른 하나는 주문 세부정보용입니다.

소스 Kafka 메시지 형식

다음 구조를 달성하기 위해 JsonConverter를 사용하여 주제에 게시된 Kafka 메시지를 형식화합니다.

{
  "user": "user123",
  "phone": "800‑555‑0199",
  "email": "business@example.com",
  "order": {
    id: "order123",
    items: ["itemUUID1", "itemUUID2"],
    discount: 0.2
  }
}
예상 Bigtable 행

각 메시지를 다음 구조를 사용하여 Bigtable 행으로 작성하려고 합니다.

row key contact_details order_details
name 전화 이메일 orderId items discount
user123#order123 user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0.2
커넥터 구성
예상 결과를 얻으려면 다음 구성 파일을 작성합니다.
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Settings for row key creation
row.key.definition=user,order.id
row.key.delimiter=#

# All user identifiers are root level fields.
# Use the default column family to aggregate them into a single family.
default.column.family=contact_details

# Use SMT to rename "orders" field into "order_details" for the new column family
transforms=renameOrderField
transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key
transforms.renameOrderField.renames=order:order_details
이 파일을 사용한 결과는 다음과 같습니다.
  • row.key.definition=user,order.id는 행 키를 구성하는 데 사용할 필드를 쉼표로 구분한 목록입니다. 각 항목은 row.key.delimiter 설정의 문자 집합과 연결됩니다.

    row.key.definition를 사용하는 경우 모든 메시지가 동일한 스키마를 사용해야 합니다. 구조가 다른 메시지를 서로 다른 열이나 열 패밀리로 처리해야 하는 경우 별도의 커넥터 인스턴스를 만드는 것이 좋습니다. 자세한 내용은 이 문서의 예: 여러 테이블에 메시지 쓰기 섹션을 참고하세요.

  • Bigtable column family 이름은 null이 아닌 루트 수준 구조체의 이름을 기반으로 합니다. 따라서

    • 연락처 세부정보의 값은 루트 수준 기본 데이터 유형이므로 default.column.family=contact_details 설정을 사용하여 기본 열 패밀리로 집계합니다.
    • 주문 세부정보는 이미 order 객체로 래핑되어 있지만 order_details을 열 패밀리 이름으로 사용하려고 합니다. 이를 위해 ReplaceFields SMT를 사용하여 필드 이름을 바꿉니다.

예: 자동 테이블 생성 및 동일한 쓰기

샘플 시나리오

수신 Kafka 메시지에 쇼핑 주문 세부정보가 포함되어 있습니다. 고객은 주문 처리 전에 장바구니를 수정할 수 있으므로 변경된 주문이 포함된 후속 메시지를 받게 되며, 이를 동일한 행에 업데이트로 저장해야 합니다. 쓰기 시점에 대상 테이블이 존재한다고 보장할 수도 없으므로 커넥터가 테이블이 없는 경우 자동으로 테이블을 생성하도록 해야 합니다.

커넥터 구성
예상 결과를 얻으려면 다음 구성 파일을 작성합니다.
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Automatically create destination tables if they don't exist
auto.create.tables=true

# UPSERT causes subsequent writes to overwrite existing rows.
# This way you can update the same order when customers change the contents
# of their baskets.
insert.mode=upsert

예: 여러 테이블에 메시지 쓰기

샘플 시나리오

수신되는 Kafka 메시지에는 다양한 주문 처리 채널의 쇼핑 주문에 관한 세부정보가 포함되어 있습니다. 이러한 메시지는 서로 다른 주제에 게시되며 동일한 구성 파일을 사용하여 별도의 테이블에 작성하려고 합니다.

커넥터 구성

여러 테이블에 메시지를 쓸 수 있지만 설정에 단일 구성 파일을 사용하는 경우 각 메시지는 동일한 스키마를 사용해야 합니다. 서로 다른 주제의 메시지를 별도의 열이나 패밀리로 처리해야 하는 경우 별도의 커넥터 인스턴스를 만드는 것이 좋습니다.

예상 결과를 얻으려면 다음 구성 파일을 작성합니다.

# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation are also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topics where shopping details are posted
topics=shopping_topic_store1,shopping_topic_store2

# Use a dynamic table name based on the Kafka topic name.
table.name.format=orders_${topic}

이 접근 방식에서는 table.name.format=orders_${topic} 속성을 사용하여 각 Kafka 주제 이름을 동적으로 참조합니다. topics=shopping_topic_store1,shopping_topic_store2 설정으로 여러 주제 이름을 구성하면 각 메시지가 별도의 테이블에 작성됩니다.

  • shopping_topic_store1 주제의 메시지는 orders_shopping_topic_store1 테이블에 기록됩니다.
  • shopping_topic_store2 주제의 메시지는 orders_shopping_topic_store2 테이블에 기록됩니다.

다음 단계