Menghubungkan Pub/Sub ke Apache Kafka

Dokumen ini menjelaskan cara mengintegrasikan Apache Kafka dan Pub/Sub dengan menggunakan Konektor Kafka Grup Pub/Sub.

Tentang Konektor Kafka Grup Pub/Sub

Apache Kafka adalah platform open source untuk streaming peristiwa. Biasanya digunakan dalam arsitektur terdistribusi untuk mengaktifkan komunikasi antar komponen yang dikaitkan secara longgar. Pub/Sub adalah layanan terkelola untuk mengirim dan menerima pesan secara asinkron. Seperti Kafka, Anda dapat menggunakan Pub/Sub untuk berkomunikasi antar-komponen dalam arsitektur cloud Anda.

Konektor Kafka Grup Pub/Sub memungkinkan Anda mengintegrasikan kedua sistem ini. Konektor berikut dikemas dalam JAR Konektor:

  • Konektor sink membaca data dari satu atau beberapa topik Kafka dan memublikasikannya ke Pub/Sub.
  • Konektor sumber membaca pesan dari topik Pub/Sub dan memublikasikannya ke Kafka.

Berikut beberapa skenario saat Anda dapat menggunakan Konektor Kafka Grup Pub/Sub:

  • Anda memigrasikan arsitektur berbasis Kafka ke Google Cloud.
  • Anda memiliki sistem frontend yang menyimpan peristiwa di Kafka di luar Google Cloud, tetapi Anda juga menggunakan Google Cloud untuk menjalankan beberapa layanan backend Anda, yang perlu menerima peristiwa Kafka.
  • Anda mengumpulkan log dari solusi Kafka lokal dan mengirimkannya ke Google Cloud untuk analisis data.
  • Anda memiliki sistem frontend yang menggunakan Google Cloud, tetapi Anda juga menyimpan data di tempat menggunakan Kafka.

Konektor ini memerlukan Kafka Connect, yang merupakan framework untuk streaming data antara Kafka dan sistem lainnya. Untuk menggunakan konektor, Anda harus menjalankan Kafka Connect bersama dengan cluster Kafka.

Dokumen ini mengasumsikan bahwa Anda sudah memahami Kafka dan Pub/Sub. Sebelum membaca dokumen ini, sebaiknya selesaikan salah satu mulai cepat Pub/Sub.

Konektor Pub/Sub tidak mendukung integrasi apa pun antara Google Cloud ACL IAM dan Kafka Connect.

Mulai menggunakan konektor

Bagian ini akan memandu Anda melakukan tugas-tugas berikut:

  1. Konfigurasi Konektor Kafka Grup Pub/Sub.
  2. Mengirim peristiwa dari Kafka ke Pub/Sub.
  3. Mengirim pesan dari Pub/Sub ke Kafka.

Prasyarat

Menginstal Kafka

Ikuti panduan memulai Apache Kafka untuk menginstal Kafka satu node di komputer lokal Anda. Selesaikan langkah-langkah berikut di panduan memulai:

  1. Download rilis Kafka terbaru dan ekstrak.
  2. Mulai lingkungan Kafka.
  3. Buat topik Kafka.

Autentikasikan

Konektor Kafka Grup Pub/Sub harus melakukan autentikasi dengan Pub/Sub untuk mengirim dan menerima pesan Pub/Sub. Untuk menyiapkan autentikasi, lakukan langkah-langkah berikut:

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  7. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  8. Install the Google Cloud CLI.

  9. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  10. To initialize the gcloud CLI, run the following command:

    gcloud init
  11. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  12. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  13. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/pubsub.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  14. Mendownload JAR konektor

    Download file JAR konektor ke komputer lokal Anda. Untuk mengetahui informasi selengkapnya, lihat Mendapatkan konektor di readme GitHub.

    Menyalin file konfigurasi konektor

    1. Clone atau download repositori GitHub untuk konektor.

      git clone https://github.com/googleapis/java-pubsub-group-kafka-connector.git
      cd java-pubsub-group-kafka-connector
      
    2. Salin konten direktori config ke subdirektori config dari penginstalan Kafka Anda.

      cp config/* [path to Kafka installation]/config/
      

    File ini berisi setelan konfigurasi untuk konektor.

    Memperbarui konfigurasi Kafka Connect

    1. Buka direktori yang berisi biner Kafka Connect yang Anda download.
    2. Di direktori biner Kafka Connect, buka file bernama config/connect-standalone.properties di editor teks.
    3. Jika plugin.path property diberi tanda komentar, hapus tanda komentarnya.
    4. Perbarui plugin.path property untuk menyertakan jalur ke JAR konektor.

      Contoh:

      plugin.path=/home/PubSubKafkaConnector/pubsub-group-kafka-connector-1.0.0.jar
      
    5. Tetapkan properti offset.storage.file.filename ke nama file lokal. Dalam mode mandiri, Kafka menggunakan file ini untuk menyimpan data offset.

      Contoh:

      offset.storage.file.filename=/tmp/connect.offsets
      

    Meneruskan peristiwa dari Kafka ke Pub/Sub

    Bagian ini menjelaskan cara memulai konektor sink, memublikasikan peristiwa ke Kafka, lalu membaca pesan yang diteruskan dari Pub/Sub.

    1. Gunakan Google Cloud CLI untuk membuat topik Pub/Sub dengan langganan.

      gcloud pubsub topics create PUBSUB_TOPIC
      gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

      Ganti kode berikut:

      • PUBSUB_TOPIC: Nama topik Pub/Sub untuk menerima pesan dari Kafka.
      • PUBSUB_SUBSCRIPTION: Nama langganan Pub/Sub untuk topik.
    2. Buka file /config/cps-sink-connector.properties di editor teks. Tambahkan nilai untuk properti berikut, yang ditandai "TODO" dalam komentar:

      topics=KAFKA_TOPICS
      cps.project=PROJECT_ID
      cps.topic=PUBSUB_TOPIC

      Ganti kode berikut:

      • KAFKA_TOPICS: Daftar topik Kafka yang dipisahkan koma untuk dibaca.
      • PROJECT_ID: Project Google Cloud yang berisi topik Pub/Sub Anda.
      • PUBSUB_TOPIC: Topik Pub/Sub untuk menerima pesan dari Kafka.
    3. Dari direktori Kafka, jalankan perintah berikut:

      bin/connect-standalone.sh \
        config/connect-standalone.properties \
        config/cps-sink-connector.properties
      
    4. Ikuti langkah-langkah di Panduan memulai Apache Kafka untuk menulis beberapa peristiwa ke topik Kafka Anda.

    5. Gunakan gcloud CLI untuk membaca peristiwa dari Pub/Sub.

      gcloud pubsub subscriptions pull PUBSUB_SUBSCRIPTION --auto-ack

    Meneruskan pesan dari Pub/Sub ke Kafka

    Bagian ini menjelaskan cara memulai konektor sumber, memublikasikan pesan ke Pub/Sub, dan membaca pesan yang diteruskan dari Kafka.

    1. Gunakan gcloud CLI untuk membuat topik Pub/Sub dengan langganan.

      gcloud pubsub topics create PUBSUB_TOPIC
      gcloud pubsub subscriptions create PUBSUB_SUBSCRIPTION --topic=PUBSUB_TOPIC

      Ganti kode berikut:

      • PUBSUB_TOPIC: Nama topik Pub/Sub.
      • PUBSUB_SUBSCRIPTION: Nama langganan Pub/Sub.
    2. Buka file bernama /config/cps-source-connector.properties di editor teks. Tambahkan nilai untuk properti berikut, yang ditandai "TODO" dalam komentar:

      kafka.topic=KAFKA_TOPIC
      cps.project=PROJECT_ID
      cps.subscription=PUBSUB_SUBSCRIPTION

      Ganti kode berikut:

      • KAFKA_TOPIC: Topik Kafka untuk menerima pesan Pub/Sub.
      • PROJECT_ID: Project Google Cloud yang berisi topik Pub/Sub Anda.
      • PUBSUB_TOPIC: Topik Pub/Sub.
    3. Dari direktori Kafka, jalankan perintah berikut:

      bin/connect-standalone.sh \
        config/connect-standalone.properties \
        config/cps-source-connector.properties
      
    4. Gunakan gcloud CLI untuk memublikasikan pesan ke Pub/Sub.

      gcloud pubsub topics publish PUBSUB_TOPIC --message="message 1"
    5. Baca pesan dari Kafka. Ikuti langkah-langkah dalam Panduan memulai Apache Kafka untuk membaca pesan dari topik Kafka.

    Konversi pesan

    Kafka record berisi kunci dan nilai, yang merupakan array byte dengan panjang variabel. Secara opsional, data Kafka juga dapat memiliki header, yang merupakan pasangan nilai kunci. Pesan Pub/Sub memiliki dua bagian utama: isi pesan dan nol atau lebih atribut key-value.

    Kafka Connect menggunakan konverter untuk melakukan serialisasi kunci dan nilai ke dan dari Kafka. Untuk mengontrol serialisasi, tetapkan properti berikut dalam file konfigurasi konektor:

    • key.converter: Konverter yang digunakan untuk menyerialisasi kunci rekaman.
    • value.converter: Konverter yang digunakan untuk menyerialisasi nilai rekaman.

    Isi pesan Pub/Sub adalah objek ByteString, sehingga konversi yang paling efisien adalah menyalin payload secara langsung. Oleh karena itu, sebaiknya gunakan konverter yang menghasilkan jenis data primitif (skema integer, float, string, atau byte) jika memungkinkan, untuk mencegah deserialisasi dan serialisasi ulang isi pesan yang sama.

    Konversi dari Kafka ke Pub/Sub

    Konektor sink mengonversi rekaman Kafka menjadi pesan Pub/Sub sebagai berikut:

    • Kunci rekaman Kafka disimpan sebagai atribut bernama "key" dalam pesan Pub/Sub.
    • Secara default, konektor akan menghapus header apa pun dalam rekaman Kafka. Namun, jika Anda menyetel opsi konfigurasi headers.publish ke true, konektor akan menulis header sebagai atribut Pub/Sub. Konektor akan melewati header apa pun yang melebihi batas atribut pesan di Pub/Sub.
    • Untuk skema bilangan bulat, float, string, dan byte, konektor meneruskan byte nilai rekaman Kafka langsung ke isi pesan Pub/Sub.
    • Untuk skema struct, konektor menulis setiap kolom sebagai atribut pesan Pub/Sub. Misalnya, jika kolomnya adalah { "id"=123 }, pesan Pub/Sub yang dihasilkan memiliki atribut "id"="123". Nilai kolom selalu dikonversi menjadi string. Jenis peta dan struct tidak didukung sebagai jenis kolom dalam struct.
    • Untuk skema peta, konektor menulis setiap pasangan nilai kunci sebagai atribut pesan Pub/Sub. Misalnya, jika peta adalah {"alice"=1,"bob"=2}, pesan Pub/Sub yang dihasilkan memiliki dua atribut, "alice"="1" dan "bob"="2". Kunci dan nilai dikonversi menjadi string.

    Skema struct dan peta memiliki beberapa perilaku tambahan:

    • Secara opsional, Anda dapat menentukan kolom struct atau kunci peta tertentu sebagai isi pesan, dengan menyetel properti konfigurasi messageBodyName. Nilai kolom atau kunci disimpan sebagai ByteString dalam isi pesan. Jika Anda tidak menetapkan messageBodyName, maka isi pesan akan kosong untuk skema struct dan map.

    • Untuk nilai array, konektor hanya mendukung jenis array primitif. Urutan nilai dalam array digabungkan menjadi satu objek ByteString.

    Konversi dari Pub/Sub ke Kafka

    Konektor sumber mengonversi pesan Pub/Sub menjadi rekaman Kafka sebagai berikut:

    • Kunci rekaman Kafka: Secara default, kunci disetel ke null. Secara opsional, Anda dapat menentukan atribut pesan Pub/Sub yang akan digunakan sebagai kunci, dengan menetapkan opsi konfigurasi kafka.key.attribute. Dalam hal ini, konektor akan mencari atribut dengan nama tersebut dan menetapkan kunci rekaman ke nilai atribut. Jika atribut yang ditentukan tidak ada, kunci rekaman akan ditetapkan ke null.

    • Nilai rekaman Kafka. Konektor menulis nilai rekaman sebagai berikut:

      • Jika pesan Pub/Sub tidak memiliki atribut kustom, konektor akan menulis isi pesan Pub/Sub langsung ke nilai rekaman Kafka sebagai jenis byte[], menggunakan konverter yang ditentukan oleh value.converter.

      • Jika pesan Pub/Sub memiliki atribut kustom dan kafka.record.headers adalah false, konektor akan menulis struct ke nilai rekaman. Struktur ini berisi satu kolom untuk setiap atribut, dan kolom bernama "message" yang nilainya adalah isi pesan Pub/Sub (disimpan sebagai byte):

        {
          "message": "<Pub/Sub message body>",
          "<attribute-1>": "<value-1>",
          "<attribute-2>": "<value-2>",
          ....
        }
        

        Dalam hal ini, Anda harus menggunakan value.converter yang kompatibel dengan skema struct, seperti org.apache.kafka.connect.json.JsonConverter.

      • Jika pesan Pub/Sub memiliki atribut kustom dan kafka.record.headers adalah true, konektor akan menulis atribut sebagai header rekaman Kafka. Menulis isi pesan Pub/Sub langsung ke nilai rekaman Kafka sebagai jenis byte[], menggunakan konverter yang ditentukan oleh value.converter.

    • Header rekaman Kafka. Secara default, header kosong, kecuali jika Anda menyetel kafka.record.headers ke true.

    Opsi konfigurasi

    Selain konfigurasi yang disediakan oleh Kafka Connect API, Pub/Sub Group Kafka Connector mendukung konfigurasi sink dan sumber seperti yang dijelaskan dalam konfigurasi konektor Pub/Sub.

    Mendapatkan dukungan

    Jika Anda memerlukan bantuan, buat tiket dukungan. Untuk pertanyaan dan diskusi umum, buat masalah di repositori GitHub.

    Langkah berikutnya