Konektor sink Bigtable Kafka Connect
Konektor sink adalah plugin untuk framework Kafka Connect yang dapat Anda gunakan untuk melakukan streaming data dari Kafka langsung ke sistem lain untuk penyimpanan dan pemrosesan. Sink Kafka Connect Bigtable adalah konektor khusus yang dirancang untuk melakukan streaming data ke Bigtable secara real time dengan latensi sekecil mungkin.
Halaman ini menjelaskan fitur dan batasan konektor. Artikel ini juga memberikan contoh penggunaan untuk skenario lanjutan dengan Transformasi Pesan Tunggal (SMT) dan pembuatan tabel otomatis. Untuk mengetahui petunjuk penginstalan dan dokumentasi referensi lengkap, lihat repositori Kafka Connect Bigtable Sink Connector.
Fitur
Konektor sink Bigtable berlangganan topik Kafka Anda, membaca pesan yang diterima di topik ini, lalu menulis data ke tabel Bigtable. Bagian berikut memberikan ringkasan tingkat tinggi untuk setiap fitur. Untuk mengetahui detail penggunaan, lihat bagian Konfigurasi dalam dokumen ini.
Pemetaan tombol, SMT, dan konverter
Untuk menulis data ke tabel Bigtable, Anda harus memberikan
kunci baris, grup kolom, dan nama kolom yang unik untuk setiap operasi.
Informasi ini disimpulkan dari kolom dalam pesan Kafka.
Anda dapat membuat semua ID yang diperlukan dengan setelan seperti
row.key.definition
, row.key.delimiter
, atau
default.column.family
.
Pembuatan tabel otomatis
Anda dapat menggunakan setelan auto.create.tables
dan
auto.create.column.families
untuk otomatis
membuat tabel tujuan dan grup kolom jika tidak ada di
tujuan Bigtable Anda. Fleksibilitas ini memiliki biaya performa
tertentu, jadi sebaiknya Anda membuat tabel terlebih dahulu tempat Anda
ingin melakukan streaming data.
Mode penulisan dan menghapus baris
Saat menulis ke tabel, Anda dapat menimpa data sepenuhnya
jika baris sudah ada, atau memilih untuk menghentikan operasi
dengan setelan insert.mode
. Anda dapat memanfaatkan setelan ini
bersama dengan penanganan error DLQ untuk mendapatkan jaminan pengiriman setidaknya sekali.
Untuk mengeluarkan perintah DELETE
, konfigurasi properti value.null.mode
. Anda dapat menggunakannya untuk menghapus seluruh baris, family kolom, atau
kolom individual.
Antrean Pesan yang Tidak Terkirim
Konfigurasi properti errors.deadletterqueue.topic.name
dan tetapkan errors.tolerance=all
untuk memposting pesan yang gagal
diproses ke topik DLQ Anda.
Kompatibilitas dengan Confluent Platform Bigtable Sink Connector
Konektor sink Kafka Connect Bigtable dari Google Cloud
menawarkan paritas penuh dengan
Konektor Sink Bigtable Confluent Platform yang dikelola sendiri.
Anda dapat menggunakan file konfigurasi yang ada untuk konektor Confluent Platform
dengan menyesuaikan setelan connector.class
menjadi
connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector
.
Batasan
Batasan berikut berlaku:
Konektor sink Bigtable Kafka Connect saat ini hanya didukung untuk cluster Kafka tempat Anda dapat menginstal konektor secara independen (cluster Kafka yang dikelola sendiri atau lokal). Konektor ini saat ini tidak didukung untuk Google Cloud Managed Service for Apache Kafka.
Konektor ini dapat membuat kolom dan kolom keluarga dari nama kolom hingga dua tingkat bersarang:
- Struct yang bertingkat lebih dari dua tingkat dikonversi menjadi
JSON
dan disimpan di kolom induknya. - Struktur tingkat root diubah menjadi grup kolom. Kolom dalam struct tersebut menjadi nama kolom.
- Nilai primitif tingkat root secara default disimpan ke grup kolom yang menggunakan topik Kafka sebagai namanya. Kolom dalam grup tersebut memiliki nama yang sama dengan nama kolom. Anda dapat mengubah perilaku ini menggunakan setelan
default.column.family
dandefault.column.qualifier
.
- Struct yang bertingkat lebih dari dua tingkat dikonversi menjadi
Penginstalan
Untuk menginstal konektor ini, ikuti langkah-langkah penginstalan standar:
buat project dengan Maven, salin file .jar
ke direktori plugin Kafka Connect, dan buat file konfigurasi.
Untuk mengetahui petunjuk langkah demi langkah, lihat bagian
Menjalankan konektor
di repositori.
Konfigurasi
Untuk mengonfigurasi konektor Kafka Connect, Anda perlu menulis file konfigurasi. Konektor sink Kafka Connect Bigtable dari Google Cloud mendukung semua properti konektor Kafka dasar, serta beberapa kolom tambahan yang disesuaikan untuk bekerja dengan tabel Bigtable.
Bagian berikut memberikan contoh mendetail untuk kasus penggunaan yang lebih canggih, tetapi tidak menjelaskan semua setelan yang tersedia. Untuk contoh penggunaan dasar dan referensi properti lengkap, lihat repositori Kafka Connect Bigtable Sink Connector.
Contoh: pembuatan kunci baris dan family kolom yang fleksibel
- Contoh skenario
-
Pesan Kafka masuk Anda berisi detail untuk pesanan belanja dengan ID pengguna. Anda ingin menulis setiap pesanan ke baris dengan dua family kolom: satu untuk detail pengguna, satu untuk detail pesanan.
- Format pesan Kafka sumber
-
Anda memformat pesan Kafka yang diposting ke topik dengan
JsonConverter
untuk mendapatkan struktur berikut:{ "user": "user123", "phone": "800‑555‑0199", "email": "business@example.com", "order": { id: "order123", items: ["itemUUID1", "itemUUID2"], discount: 0.2 } }
- Baris Bigtable yang diharapkan
-
Anda ingin menulis setiap pesan sebagai baris Bigtable dengan struktur berikut:
Row key contact_details order_details nama telepon email orderId item diskon user123#order123
user123 800‑555‑0199 business@example.com order123 ["itemUUID1", "itemUUID2"] 0,2 - Konfigurasi konektor
-
Untuk mendapatkan hasil yang diharapkan, Anda menulis file konfigurasi berikut:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Settings for row key creation row.key.definition=user,order.id row.key.delimiter=# # All user identifiers are root level fields. # Use the default column family to aggregate them into a single family. default.column.family=contact_details # Use SMT to rename "orders" field into "order_details" for the new column family transforms=renameOrderField transforms.renameOrderField.type=org.apache.kafka.connect.transforms.ReplaceField$Key transforms.renameOrderField.renames=order:order_details
Hasil penggunaan file ini adalah sebagai berikut:
-
row.key.definition=user,order.id
adalah daftar kolom yang dipisahkan koma yang ingin Anda gunakan untuk membuat kunci baris. Setiap entri digabungkan dengan set karakter dalam setelanrow.key.delimiter
.Saat Anda menggunakan
row.key.definition
, semua pesan Anda harus menggunakan skema yang sama. Jika Anda perlu memproses pesan dengan struktur yang berbeda ke dalam kolom atau family kolom yang berbeda, sebaiknya buat instance konektor terpisah. Untuk mengetahui informasi selengkapnya, lihat bagian Contoh: menulis pesan ke beberapa tabel dalam dokumen ini. -
Nama grup kolom Bigtable didasarkan pada nama struct tingkat root non-null. Dengan demikian:
- Nilai untuk detail kontak adalah jenis data primitif tingkat root, jadi
Anda menggabungkannya ke dalam kelompok kolom default dengan setelan
default.column.family=contact_details
. - Detail pesanan sudah dibungkus dalam objek
order
, tetapi Anda ingin menggunakanorder_details
sebagai nama family kolom. Untuk melakukannya, Anda menggunakan SMT ReplaceFields dan mengganti nama kolom.
- Nilai untuk detail kontak adalah jenis data primitif tingkat root, jadi
Anda menggabungkannya ke dalam kelompok kolom default dengan setelan
Contoh: pembuatan tabel otomatis dan penulisan idempoten
- Contoh skenario
-
Pesan Kafka masuk Anda berisi detail untuk pesanan shopping. Pelanggan dapat mengedit keranjang sebelum pemenuhan, jadi Anda akan menerima pesan lanjutan dengan pesanan yang diubah yang perlu Anda simpan sebagai pembaruan di baris yang sama. Anda juga tidak dapat menjamin bahwa tabel tujuan ada pada waktu penulisan, jadi Anda ingin konektor membuat tabel secara otomatis jika tabel tersebut tidak ada.
- Konfigurasi konektor
-
Untuk mendapatkan hasil yang diharapkan, Anda menulis file konfigurasi berikut:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topic where shopping details are posted topics=shopping_topic # Automatically create destination tables if they don't exist auto.create.tables=true # UPSERT causes subsequent writes to overwrite existing rows. # This way you can update the same order when customers change the contents # of their baskets. insert.mode=upsert
Contoh: menulis pesan ke beberapa tabel
- Contoh skenario
-
Pesan Kafka masuk Anda berisi detail untuk pesanan belanja dari berbagai saluran pemenuhan. Pesan ini diposting ke topik yang berbeda, dan Anda ingin menggunakan file konfigurasi yang sama untuk menulisnya ke tabel yang terpisah.
- Konfigurasi konektor
-
Anda dapat menulis pesan ke beberapa tabel, tetapi jika Anda menggunakan satu file konfigurasi untuk penyiapan, setiap pesan harus menggunakan skema yang sama. Jika Anda perlu memproses pesan dari topik yang berbeda ke dalam kolom atau grup yang berbeda, sebaiknya buat instance konektor terpisah.
Untuk mendapatkan hasil yang diharapkan, Anda menulis file konfigurasi berikut:
# Settings such as latency configuration or DLQ identifiers omitted for brevity. # Refer to the GitHub repository for full settings reference. # Settings for row key creation are also omitted. # Refer to the Example: flexible row key and column family creation section. # Connector name, class, Bigtable and Google Cloud identifiers name=BigtableSinkConnector connector.class=com.google.cloud.kafka.connect.bigtable.BigtableSinkConnector gcp.bigtable.project.id=my_project_id gcp.bigtable.instance.id=my_bigtable_instance_id # Use JsonConverter to format Kafka messages as JSON key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter # Name of the topics where shopping details are posted topics=shopping_topic_store1,shopping_topic_store2 # Use a dynamic table name based on the Kafka topic name. table.name.format=orders_${topic}
Dalam pendekatan ini, Anda menggunakan properti
table.name.format=orders_${topic}
untuk merujuk secara dinamis ke setiap nama topik Kafka. Saat Anda mengonfigurasi beberapa nama topik dengan setelantopics=shopping_topic_store1,
, setiap pesan ditulis ke tabel terpisah:shopping_topic_store2 - Pesan dari topik
shopping_topic_store1
ditulis ke tabelorders_shopping_topic_store1
. - Pesan dari topik
shopping_topic_store2
ditulis ke tabelorders_shopping_topic_store2
.
- Pesan dari topik