Halaman ini menjelaskan cara menggunakan Google Cloud Managed Service for Apache Kafka sebagai sumber atau tujuan dalam pipeline Dataflow.
Anda dapat menggunakan salah satu pendekatan berikut:
Persyaratan
Aktifkan Cloud Storage, Dataflow, dan Managed Service for Apache Kafka API di project Anda. Lihat bagian Mengaktifkan API atau jalankan perintah Google Cloud CLI berikut:
gcloud services enable dataflow.googleapis.com managedkafka.googleapis.com storage-component.googleapis.com
Akun layanan pekerja Dataflow harus memiliki peran Identity and Access Management (IAM) Managed Kafka Client (
roles/managedkafka.client
).VM pekerja Dataflow harus memiliki akses jaringan ke server bootstrap Kafka. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi jaringan Managed Service for Apache Kafka.
Mendapatkan alamat server bootstrap
Untuk menjalankan pipeline yang terhubung ke cluster Managed Service for Apache Kafka, dapatkan terlebih dahulu alamat server bootstrap cluster. Anda memerlukan alamat ini saat mengonfigurasi pipeline.
Anda dapat menggunakan konsol Google Cloud atau Google Cloud CLI, sebagai berikut:
Konsol
Di konsol Google Cloud , buka halaman Clusters.
Klik nama cluster.
Klik tab Konfigurasi.
Salin alamat server bootstrap dari Bootstrap URL.
gcloud
Gunakan perintah managed-kafka clusters describe
.
gcloud managed-kafka clusters describe CLUSTER_ID \
--location=LOCATION \
--format="value(bootstrapAddress)"
Ganti kode berikut:
- CLUSTER_ID: ID atau nama cluster
- LOCATION: lokasi cluster
Untuk mengetahui informasi selengkapnya, lihat Melihat cluster Managed Service for Apache Kafka.
Menggunakan Managed Service for Apache Kafka dengan template Dataflow
Google menyediakan beberapa template Dataflow yang membaca dari Apache Kafka:
Template ini dapat digunakan dengan Managed Service for Apache Kafka. Jika salah satu di antaranya cocok dengan kasus penggunaan Anda, pertimbangkan untuk menggunakannya daripada menulis kode pipeline kustom.
Konsol
Buka halaman Dataflow > Jobs.
Klik Buat tugas dari template.
Di bagian Job name, masukkan nama untuk tugas.
Dari menu drop-down template Dataflow, pilih template yang akan dijalankan.
Di kotak Kafka bootstrap server, masukkan alamat server bootstrap.
Di kotak Kafka topic, masukkan nama topik.
Untuk Kafka authentication mode, pilih APPLICATION_DEFAULT_CREDENTIALS.
Untuk Kafka message format, pilih format pesan Apache Kafka.
Masukkan parameter lain sesuai kebutuhan. Parameter yang didukung didokumentasikan untuk setiap template.
Jalankan tugas.
gcloud
Gunakan perintah
gcloud dataflow jobs run
.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://TEMPLATE_FILE \
--region REGION_NAME \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_NAME/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS\,
# Other parameters, depending on the template
# ...
Ganti kode berikut:
- JOB_NAME: nama untuk tugas
- TEMPLATE_FILE: lokasi file template di Cloud Storage
- REGION_NAME: region tempat Anda ingin men-deploy tugas
- PROJECT_NAME: nama Google Cloud project Anda
- LOCATION: lokasi cluster
- CLUSTER_ID: ID atau nama cluster
- TOPIC: nama topik Kafka
Menggunakan Managed Service for Apache Kafka dengan pipeline Beam
Bagian ini menjelaskan cara menggunakan Apache Beam SDK untuk membuat dan menjalankan pipeline Dataflow yang terhubung ke Managed Service for Apache Kafka.
Untuk sebagian besar skenario, gunakan
transformasi I/O terkelola sebagai
sumber atau tujuan Kafka Anda. Jika Anda memerlukan penyesuaian performa yang lebih canggih, pertimbangkan untuk menggunakan konektor KafkaIO
.
Untuk mengetahui informasi selengkapnya tentang manfaat penggunaan I/O terkelola, lihat
I/O terkelola Dataflow.
Persyaratan
Kafka Client versi 3.6.0 atau yang lebih baru.
Apache Beam SDK versi 2.61.0 atau yang lebih baru.
Mesin tempat Anda memulai tugas Dataflow harus memiliki akses jaringan ke server bootstrap Apache Kafka. Misalnya, mulai tugas dari instance Compute Engine yang dapat mengakses VPC tempat cluster dapat dijangkau.
Akun utama yang membuat tugas harus memiliki peran IAM berikut:
- Managed Kafka Client (
roles/managedkafka.client
) untuk mengakses cluster Apache Kafka. - Pengguna Akun Layanan (
roles/iam.serviceAccountUser
) untuk bertindak sebagai akun layanan pekerja Dataflow. - Storage Admin (
roles/storage.admin
) untuk mengupload file tugas ke Cloud Storage. - Dataflow Admin (
roles/dataflow.admin
) untuk membuat tugas.
Jika memulai tugas dari instance Compute Engine, Anda dapat memberikan peran ini ke akun layanan yang terpasang ke VM. Untuk mengetahui informasi selengkapnya, lihat Membuat VM yang menggunakan akun layanan yang dikelola pengguna.
Anda juga dapat menggunakan Kredensial Default Aplikasi (ADC) dengan peniruan akun layanan saat membuat tugas.
- Managed Kafka Client (
Mengonfigurasi I/O Terkelola
Jika pipeline Anda menggunakan Managed I/O for Apache Kafka, tetapkan opsi konfigurasi berikut untuk melakukan autentikasi dengan Managed Service for Apache Kafka:
"security.protocol"
:"SASL_SSL"
"sasl.mechanism"
:"OAUTHBEARER"
"sasl.login.callback.handler.class"
:"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler"
"sasl.jaas.config"
:"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
Contoh berikut menunjukkan cara mengonfigurasi I/O terkelola untuk Managed Service for Apache Kafka:
Java
// Create configuration parameters for the Managed I/O transform.
ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
.put("bootstrap_servers", options.getBootstrapServer())
.put("topic", options.getTopic())
.put("data_format", "RAW")
// Set the following fields to authenticate with Application Default
// Credentials (ADC):
.put("security.protocol", "SASL_SSL")
.put("sasl.mechanism", "OAUTHBEARER")
.put("sasl.login.callback.handler.class",
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler")
.put("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;")
.build();
Python
pipeline
| beam.managed.Read(
beam.managed.KAFKA,
config={
"bootstrap_servers": options.bootstrap_server,
"topic": options.topic,
"data_format": "RAW",
# Set the following fields to authenticate with Application Default
# Credentials (ADC):
"security.protocol": "SASL_SSL",
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class":
"com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config":
"org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
}
)
Mengonfigurasi konektor KafkaIO
Contoh berikut menunjukkan cara mengonfigurasi konektor KafkaIO
untuk
Managed Service for Apache Kafka:
Java
String bootstap = options.getBootstrap();
String topicName = options.getTopic();
// Read from Kafka
pipeline.apply(KafkaIO.<String, String>read()
.withBootstrapServers(bootstap)
.withTopic(topicName)
.withKeyDeserializer(IntegerSerializer.class)
.withValueDeserializer(StringDeserializer.class)
.withGCPApplicationDefaultCredentials())
// Write to Kafka
pipeline.apply(KafkaIO.<Integer, String>write()
.withBootstrapServers(bootstrap)
.withTopic(topicName)
.withKeySerializer(IntegerSerializer.class)
.withValueSerializer(StringSerializer.class)
.withGCPApplicationDefaultCredentials());
Python
WriteToKafka(
producer_config={
"bootstrap.servers": options.bootstrap_servers,
"security.protocol": 'SASL_SSL',
"sasl.mechanism": "OAUTHBEARER",
"sasl.login.callback.handler.class": "com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler",
"sasl.jaas.config": "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;"
},
topic=options.topic,
key_serializer=("org.apache.kafka.common.serialization." "LongSerializer"),
value_serializer=("org.apache.kafka.common.serialization." "StringSerializer")
)
Langkah berikutnya
- Pelajari lebih lanjut Managed Service for Apache Kafka.
- Menulis data dari Managed Service for Apache Kafka ke BigQuery.
- Membaca dari Apache Kafka ke Dataflow.
- Menulis dari Dataflow ke Apache Kafka.