Menggunakan Dataflow dengan Managed Service for Apache Kafka

Halaman ini menjelaskan cara menggunakan Google Cloud Managed Service for Apache Kafka sebagai sumber atau tujuan dalam pipeline Dataflow.

Anda dapat menggunakan salah satu pendekatan berikut:

Persyaratan

  • Aktifkan Cloud Storage, Dataflow, dan Managed Service for Apache Kafka API di project Anda. Lihat bagian Mengaktifkan API atau jalankan perintah Google Cloud CLI berikut:

    gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
    
  • Akun layanan pekerja Dataflow harus memiliki peran Identity and Access Management (IAM) Managed Kafka Client (roles/managedkafka.client).

  • VM pekerja Dataflow harus memiliki akses jaringan ke server bootstrap Kafka. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi jaringan Managed Service for Apache Kafka.

Mendapatkan alamat server bootstrap

Untuk menjalankan pipeline yang terhubung ke cluster Managed Service for Apache Kafka, dapatkan terlebih dahulu alamat server bootstrap cluster. Anda memerlukan alamat ini saat mengonfigurasi pipeline.

Anda dapat menggunakan konsol Google Cloud atau Google Cloud CLI, sebagai berikut:

Konsol

  1. Di konsol Google Cloud , buka halaman Clusters.

    Buka Cluster

  2. Klik nama cluster.

  3. Klik tab Konfigurasi.

  4. Salin alamat server bootstrap dari Bootstrap URL.

gcloud

Gunakan perintah managed-kafka clusters describe.

gcloud managed-kafka clusters describe CLUSTER_ID \
  --location=LOCATION \
  --format="value(bootstrapAddress)"

Ganti kode berikut:

  • CLUSTER_ID: ID atau nama cluster
  • LOCATION: lokasi cluster

Untuk mengetahui informasi selengkapnya, lihat Melihat cluster Managed Service for Apache Kafka.

Menggunakan Managed Service for Apache Kafka dengan template Dataflow

Google menyediakan beberapa template Dataflow yang membaca dari Apache Kafka:

Template ini dapat digunakan dengan Managed Service for Apache Kafka. Jika salah satu di antaranya cocok dengan kasus penggunaan Anda, pertimbangkan untuk menggunakannya daripada menulis kode pipeline kustom.

Konsol

  1. Buka halaman Dataflow > Jobs.

    Buka Tugas

  2. Klik Buat tugas dari template.

  3. Di bagian Job name, masukkan nama untuk tugas.

  4. Dari menu drop-down template Dataflow, pilih template yang akan dijalankan.

  5. Di kotak Kafka bootstrap server, masukkan alamat server bootstrap.

  6. Di kotak Kafka topic, masukkan nama topik.

  7. Untuk Kafka authentication mode, pilih APPLICATION_DEFAULT_CREDENTIALS.

  8. Untuk Kafka message format, pilih format pesan Apache Kafka.

  9. Masukkan parameter lain sesuai kebutuhan. Parameter yang didukung didokumentasikan untuk setiap template.

  10. Jalankan tugas.

gcloud

Gunakan perintah gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
  --gcs-location gs://TEMPLATE_FILE \
  --region REGION_NAME \
  --parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...

Ganti kode berikut:

  • JOB_NAME: nama untuk tugas
  • TEMPLATE_FILE: lokasi file template di Cloud Storage
  • REGION_NAME: region tempat Anda ingin men-deploy tugas
  • PROJECT_NAME: nama Google Cloud project Anda
  • LOCATION: lokasi cluster
  • CLUSTER_ID: ID atau nama cluster
  • TOPIC: nama topik Kafka

Menggunakan Managed Service for Apache Kafka dengan pipeline Beam

Bagian ini menjelaskan cara menggunakan Apache Beam SDK untuk membuat dan menjalankan pipeline Dataflow yang terhubung ke Managed Service for Apache Kafka.

Untuk sebagian besar skenario, gunakan transformasi I/O terkelola sebagai sumber atau tujuan Kafka Anda. Jika Anda memerlukan penyesuaian performa yang lebih canggih, pertimbangkan untuk menggunakan konektor KafkaIO. Untuk mengetahui informasi selengkapnya tentang manfaat penggunaan I/O terkelola, lihat I/O terkelola Dataflow.

Persyaratan

  • Kafka Client versi 3.6.0 atau yang lebih baru.

  • Apache Beam SDK versi 2.61.0 atau yang lebih baru.

  • Mesin tempat Anda memulai tugas Dataflow harus memiliki akses jaringan ke server bootstrap Apache Kafka. Misalnya, mulai tugas dari instance Compute Engine yang dapat mengakses VPC tempat cluster dapat dijangkau.

  • Akun utama yang membuat tugas harus memiliki peran IAM berikut:

    • Managed Kafka Client (roles/managedkafka.client) untuk mengakses cluster Apache Kafka.
    • Pengguna Akun Layanan (roles/iam.serviceAccountUser) untuk bertindak sebagai akun layanan pekerja Dataflow.
    • Storage Admin (roles/storage.admin) untuk mengupload file tugas ke Cloud Storage.
    • Dataflow Admin (roles/dataflow.admin) untuk membuat tugas.

    Jika memulai tugas dari instance Compute Engine, Anda dapat memberikan peran ini ke akun layanan yang terpasang ke VM. Untuk mengetahui informasi selengkapnya, lihat Membuat VM yang menggunakan akun layanan yang dikelola pengguna.

    Anda juga dapat menggunakan Kredensial Default Aplikasi (ADC) dengan peniruan akun layanan saat membuat tugas.

Mengonfigurasi I/O Terkelola

Jika pipeline Anda menggunakan Managed I/O for Apache Kafka, tetapkan opsi konfigurasi berikut untuk melakukan autentikasi dengan Managed Service for Apache Kafka:

  • "security.protocol": "SASL_SSL"
  • "sasl.mechanism": "OAUTHBEARER"
  • "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
  • "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"

Contoh berikut menunjukkan cara mengonfigurasi I/O terkelola untuk Managed Service for Apache Kafka:

Java

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
      .put("bootstrap_servers", options.getBootstrapServer())
      .put("topic", options.getTopic())
      .put("data_format", "RAW")
      // Set the following fields to authenticate with Application Default
      // Credentials (ADC):
      .put("security.protocol", "SASL_SSL")
      .put("sasl.mechanism", "OAUTHBEARER")
      .put("sasl.login.callback.handler.class",
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
      .put("sasl.jaas.config",   "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
      .build();

Python

pipeline
| beam.managed.Read(
    beam.managed.KAFKA,
    config={
      "bootstrap_servers": options.bootstrap_server,
      "topic": options.topic,
      "data_format": "RAW",
      # Set the following fields to authenticate with Application Default
      # Credentials (ADC):
      "security.protocol": "SASL_SSL",
      "sasl.mechanism": "OAUTHBEARER",
      "sasl.login.callback.handler.class":
          "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
      "sasl.jaas.config":
          "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
    }
)

Mengonfigurasi konektor KafkaIO

Contoh berikut menunjukkan cara mengonfigurasi konektor KafkaIO untuk Managed Service for Apache Kafka:

Java

String bootstap = options.getBootstrap();
String topicName = options.getTopic();

// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
    .withBootstrapServers(bootstap)
    .withTopic(topicName)
    .withKeyDeserializer(IntegerSerializer.class)
    .withValueDeserializer(StringDeserializer.class)
    .withGCPApplicationDefaultCredentials())

// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
    .withBootstrapServers(bootstrap)
    .withTopic(topicName)
    .withKeySerializer(IntegerSerializer.class)
    .withValueSerializer(StringSerializer.class)
    .withGCPApplicationDefaultCredentials());

Python

WriteToKafka(
  producer_config={
    "bootstrap.servers": options.bootstrap_servers,
    "security.protocol": 'SASL_SSL',
    "sasl.mechanism": "OAUTHBEARER",
    "sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
    "sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
  },
  topic=options.topic,
  key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
  value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)

Langkah berikutnya