Konektor sink Bigtable Kafka Connect


Konektor sink adalah plugin untuk framework Kafka Connect yang dapat Anda gunakan untuk melakukan streaming data dari Kafka langsung ke sistem lain untuk penyimpanan dan pemrosesan. Sink Kafka Connect Bigtable adalah konektor khusus yang dirancang untuk melakukan streaming data ke Bigtable secara real time dengan latensi sekecil mungkin.

Halaman ini menjelaskan fitur dan batasan konektor. Artikel ini juga memberikan contoh penggunaan untuk skenario lanjutan dengan Transformasi Pesan Tunggal (SMT) dan pembuatan tabel otomatis. Untuk mengetahui petunjuk penginstalan dan dokumentasi referensi lengkap, lihat repositori Kafka Connect Bigtable Sink Connector.

Fitur

Konektor sink Bigtable berlangganan topik Kafka Anda, membaca pesan yang diterima di topik ini, lalu menulis data ke tabel Bigtable. Bagian berikut memberikan ringkasan tingkat tinggi untuk setiap fitur. Untuk mengetahui detail penggunaan, lihat bagian Konfigurasi dalam dokumen ini.

Pemetaan tombol, SMT, dan konverter

Untuk menulis data ke tabel Bigtable, Anda harus memberikan kunci baris, grup kolom, dan nama kolom yang unik untuk setiap operasi. Informasi ini disimpulkan dari kolom dalam pesan Kafka. Anda dapat membuat semua ID yang diperlukan dengan setelan seperti row.key.definition, row.key.delimiter, atau default.column.family.

Pembuatan tabel otomatis

Anda dapat menggunakan setelan auto.create.tables dan auto.create.column.families untuk otomatis membuat tabel tujuan dan grup kolom jika tidak ada di tujuan Bigtable Anda. Fleksibilitas ini memiliki biaya performa tertentu, jadi sebaiknya Anda membuat tabel terlebih dahulu tempat Anda ingin melakukan streaming data.

Mode penulisan dan menghapus baris

Saat menulis ke tabel, Anda dapat menimpa data sepenuhnya jika baris sudah ada, atau memilih untuk menghentikan operasi dengan setelan insert.mode. Anda dapat memanfaatkan setelan ini bersama dengan penanganan error DLQ untuk mendapatkan jaminan pengiriman setidaknya sekali.

Untuk mengeluarkan perintah DELETE, konfigurasi properti value.null.mode. Anda dapat menggunakannya untuk menghapus seluruh baris, family kolom, atau kolom individual.

Antrean Pesan yang Tidak Terkirim

Konfigurasi properti errors.deadletterqueue.topic.name dan tetapkan errors.tolerance=all untuk memposting pesan yang gagal diproses ke topik DLQ Anda.

Kompatibilitas dengan Confluent Platform Bigtable Sink Connector

Konektor sink Kafka Connect Bigtable dari Google Cloud menawarkan paritas penuh dengan Konektor Sink Bigtable Confluent Platform yang dikelola sendiri. Anda dapat menggunakan file konfigurasi yang ada untuk konektor Confluent Platform dengan menyesuaikan setelan connector.class menjadi connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector.

Batasan

Batasan berikut berlaku:

  • Konektor sink Bigtable Kafka Connect saat ini hanya didukung untuk cluster Kafka tempat Anda dapat menginstal konektor secara independen (cluster Kafka yang dikelola sendiri atau lokal). Konektor ini saat ini tidak didukung untuk Google Cloud Managed Service for Apache Kafka.

  • Konektor ini dapat membuat kolom dan kolom keluarga dari nama kolom hingga dua tingkat bersarang:

    • Struct yang bertingkat lebih dari dua tingkat dikonversi menjadi JSON dan disimpan di kolom induknya.
    • Struktur tingkat root diubah menjadi grup kolom. Kolom dalam struct tersebut menjadi nama kolom.
    • Nilai primitif tingkat root secara default disimpan ke grup kolom yang menggunakan topik Kafka sebagai namanya. Kolom dalam grup tersebut memiliki nama yang sama dengan nama kolom. Anda dapat mengubah perilaku ini menggunakan setelan default.column.family dan default.column.qualifier.

Penginstalan

Untuk menginstal konektor ini, ikuti langkah-langkah penginstalan standar: buat project dengan Maven, salin file .jar ke direktori plugin Kafka Connect, dan buat file konfigurasi. Untuk mengetahui petunjuk langkah demi langkah, lihat bagian Menjalankan konektor di repositori.

Konfigurasi

Untuk mengonfigurasi konektor Kafka Connect, Anda perlu menulis file konfigurasi. Konektor sink Kafka Connect Bigtable dari Google Cloud mendukung semua properti konektor Kafka dasar, serta beberapa kolom tambahan yang disesuaikan untuk bekerja dengan tabel Bigtable.

Bagian berikut memberikan contoh mendetail untuk kasus penggunaan yang lebih canggih, tetapi tidak menjelaskan semua setelan yang tersedia. Untuk contoh penggunaan dasar dan referensi properti lengkap, lihat repositori Kafka Connect Bigtable Sink Connector.

Contoh: pembuatan kunci baris dan family kolom yang fleksibel

Contoh skenario

Pesan Kafka masuk Anda berisi detail untuk pesanan belanja dengan ID pengguna. Anda ingin menulis setiap pesanan ke baris dengan dua family kolom: satu untuk detail pengguna, satu untuk detail pesanan.

Format pesan Kafka sumber

Anda memformat pesan Kafka yang diposting ke topik dengan JsonConverter untuk mendapatkan struktur berikut:

{
  "user": "user123",
  "phone": "800‑555‑0199",
  "email": "business@example.com",
  "order": {
    id: "order123",
    items: ["itemUUID1", "itemUUID2"],
    discount: 0.2
  }
}
Baris Bigtable yang diharapkan

Anda ingin menulis setiap pesan sebagai baris Bigtable dengan struktur berikut:

Row key contact_details order_details
nama telepon email orderId item diskon
user123#order123 user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0,2
Konfigurasi konektor
Untuk mendapatkan hasil yang diharapkan, Anda menulis file konfigurasi berikut:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Settings for row key creation
row.key.definition=user,order.id
row.key.delimiter=#

# All user identifiers are root level fields.
# Use the default column family to aggregate them into a single family.
default.column.family=contact_details

# Use SMT to rename "orders" field into "order_details" for the new column family
transforms=renameOrderField
transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key
transforms.renameOrderField.renames=order:order_details
Hasil penggunaan file ini adalah sebagai berikut:
  • row.key.definition=user,order.id adalah daftar kolom yang dipisahkan koma yang ingin Anda gunakan untuk membuat kunci baris. Setiap entri digabungkan dengan set karakter dalam setelan row.key.delimiter.

    Saat Anda menggunakan row.key.definition, semua pesan Anda harus menggunakan skema yang sama. Jika Anda perlu memproses pesan dengan struktur yang berbeda ke dalam kolom atau family kolom yang berbeda, sebaiknya buat instance konektor terpisah. Untuk mengetahui informasi selengkapnya, lihat bagian Contoh: menulis pesan ke beberapa tabel dalam dokumen ini.

  • Nama grup kolom Bigtable didasarkan pada nama struct tingkat root non-null. Dengan demikian:

    • Nilai untuk detail kontak adalah jenis data primitif tingkat root, jadi Anda menggabungkannya ke dalam kelompok kolom default dengan setelan default.column.family=contact_details.
    • Detail pesanan sudah dibungkus dalam objek order, tetapi Anda ingin menggunakan order_details sebagai nama family kolom. Untuk melakukannya, Anda menggunakan SMT ReplaceFields dan mengganti nama kolom.

Contoh: pembuatan tabel otomatis dan penulisan idempoten

Contoh skenario

Pesan Kafka masuk Anda berisi detail untuk pesanan shopping. Pelanggan dapat mengedit keranjang sebelum pemenuhan, jadi Anda akan menerima pesan lanjutan dengan pesanan yang diubah yang perlu Anda simpan sebagai pembaruan di baris yang sama. Anda juga tidak dapat menjamin bahwa tabel tujuan ada pada waktu penulisan, jadi Anda ingin konektor membuat tabel secara otomatis jika tabel tersebut tidak ada.

Konfigurasi konektor
Untuk mendapatkan hasil yang diharapkan, Anda menulis file konfigurasi berikut:
# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topic where shopping details are posted
topics=shopping_topic

# Automatically create destination tables if they don't exist
auto.create.tables=true

# UPSERT causes subsequent writes to overwrite existing rows.
# This way you can update the same order when customers change the contents
# of their baskets.
insert.mode=upsert

Contoh: menulis pesan ke beberapa tabel

Contoh skenario

Pesan Kafka masuk Anda berisi detail untuk pesanan belanja dari berbagai saluran pemenuhan. Pesan ini diposting ke topik yang berbeda, dan Anda ingin menggunakan file konfigurasi yang sama untuk menulisnya ke tabel yang terpisah.

Konfigurasi konektor

Anda dapat menulis pesan ke beberapa tabel, tetapi jika Anda menggunakan satu file konfigurasi untuk penyiapan, setiap pesan harus menggunakan skema yang sama. Jika Anda perlu memproses pesan dari topik yang berbeda ke dalam kolom atau grup yang berbeda, sebaiknya buat instance konektor terpisah.

Untuk mendapatkan hasil yang diharapkan, Anda menulis file konfigurasi berikut:

# Settings such as latency configuration or DLQ identifiers omitted for brevity.
# Refer to the GitHub repository for full settings reference.

# Settings for row key creation are also omitted.
# Refer to the Example: flexible row key and column family creation section.

# Connector name, class, Bigtable and Google Cloud identifiers
name=BigtableSinkConnector
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
gcp.bigtable.project.id=my_project_id
gcp.bigtable.instance.id=my_bigtable_instance_id

# Use JsonConverter to format Kafka messages as JSON
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

# Name of the topics where shopping details are posted
topics=shopping_topic_store1,shopping_topic_store2

# Use a dynamic table name based on the Kafka topic name.
table.name.format=orders_${topic}

Dalam pendekatan ini, Anda menggunakan properti table.name.format=orders_${topic} untuk merujuk secara dinamis ke setiap nama topik Kafka. Saat Anda mengonfigurasi beberapa nama topik dengan setelan topics=shopping_topic_store1,shopping_topic_store2, setiap pesan ditulis ke tabel terpisah:

  • Pesan dari topik shopping_topic_store1 ditulis ke tabel orders_shopping_topic_store1.
  • Pesan dari topik shopping_topic_store2 ditulis ke tabel orders_shopping_topic_store2.

Langkah berikutnya