Template Apache Kafka ke Cloud Storage adalah pipeline streaming yang menyerap data teks dari Google Cloud Managed Service for Apache Kafka dan menghasilkan output rekaman ke Cloud Storage.
Anda juga dapat menggunakan template Apache Kafka ke BigQuery dengan Kafka yang dikelola sendiri atau eksternal.
Persyaratan pipeline
- Bucket Cloud Storage output harus ada.
- Server broker Apache Kafka harus berjalan dan dapat dijangkau dari mesin pekerja Dataflow.
- Topik Apache Kafka harus ada.
Format pesan Kafka
Template ini mendukung pembacaan pesan dari Kafka dalam format berikut:
Format JSON
Untuk membaca pesan JSON, tetapkan parameter template messageFormat
ke
"JSON"
.
Encoding biner Avro
Untuk membaca pesan Avro biner, tetapkan parameter template berikut:
messageFormat
:"AVRO_BINARY_ENCODING"
.binaryAvroSchemaPath
: Lokasi file skema Avro di Cloud Storage. Contoh:gs://BUCKET_NAME/message-schema.avsc
.
Untuk mengetahui informasi selengkapnya tentang format biner Avro, lihat Encoding biner di dokumentasi Apache Avro.
Avro yang dienkode Confluent Schema Registry
Untuk membaca pesan dalam Avro yang dienkode Confluent Schema Registry, tetapkan parameter template berikut:
messageFormat
:"AVRO_CONFLUENT_WIRE_FORMAT"
.schemaFormat
: Salah satu nilai berikut:"SINGLE_SCHEMA_FILE"
: Skema pesan ditentukan dalam file skema Avro. Tentukan lokasi Cloud Storage file skema dalam parameterconfluentAvroSchemaPath
.-
"SCHEMA_REGISTRY"
: Pesan dienkode menggunakan Confluent Schema Registry. Tentukan URL instance Confluent Schema Registry di parameterschemaRegistryConnectionUrl
, dan tentukan mode autentikasi di parameterschemaRegistryAuthenticationMode
.
Untuk mengetahui informasi selengkapnya tentang format ini, lihat Format wire dalam dokumentasi Confluent.
Format file output
Format file output adalah format yang sama dengan pesan Kafka input. Misalnya, jika Anda memilih JSON untuk format pesan Kafka, file JSON akan ditulis ke bucket Cloud Storage output.
Autentikasi
Template Apache Kafka ke Cloud Storage mendukung autentikasi SASL/PLAIN ke broker Kafka.
Parameter template
Parameter yang diperlukan
- readBootstrapServerAndTopic: Topik Kafka untuk membaca input.
- outputDirectory: Awalan jalur dan nama file untuk menulis file output. Harus diakhiri dengan garis miring. Contoh,
gs://your-bucket/your-path/
. - kafkaReadAuthenticationMode: Mode autentikasi yang akan digunakan dengan cluster Kafka. Gunakan
KafkaAuthenticationMethod.NONE
untuk tanpa autentikasi,KafkaAuthenticationMethod.SASL_PLAIN
untuk nama pengguna dan sandi SASL/PLAIN,KafkaAuthenticationMethod.SASL_SCRAM_512
untuk autentikasi SASL_SCRAM_512, danKafkaAuthenticationMethod.TLS
untuk autentikasi berbasis sertifikat.KafkaAuthenticationMethod.APPLICATION_DEFAULT_CREDENTIALS
hanya boleh digunakan untuk cluster Google Cloud Apache Kafka untuk BigQuery, yang memungkinkan autentikasi menggunakan kredensial default aplikasi. - messageFormat: Format pesan Kafka yang akan dibaca. Nilai yang didukung adalah
AVRO_CONFLUENT_WIRE_FORMAT
(Avro yang dienkode Confluent Schema Registry),AVRO_BINARY_ENCODING
(Avro biner biasa), danJSON
. Default-nya adalah: AVRO_CONFLUENT_WIRE_FORMAT. - useBigQueryDLQ: Jika benar (true), pesan yang gagal akan ditulis ke BigQuery dengan informasi error tambahan. Nilai defaultnya adalah: false.
Parameter opsional
- windowDuration: Durasi/ukuran jendela tempat data akan ditulis ke Cloud Storage. Format yang diizinkan adalah: Ns (untuk detik, contoh: 5s), Nm (untuk menit, contoh: 12m), Nh (untuk jam, contoh: 2h). Contoh,
5m
. Nilai defaultnya adalah: 5 menit. - outputFilenamePrefix: Awalan yang akan ditempatkan pada setiap file berwindow. Contoh,
output-
. Defaultnya adalah: output. - numShards: Jumlah maksimum shard output yang dihasilkan saat menulis. Jumlah shard yang lebih tinggi berarti throughput yang lebih tinggi untuk menulis ke Cloud Storage, tetapi berpotensi biaya agregasi data yang lebih tinggi di seluruh shard saat memproses file Cloud Storage output. Nilai default ditentukan oleh Dataflow.
- enableCommitOffsets: Mengirimkan offset pesan yang diproses ke Kafka. Jika diaktifkan, hal ini akan meminimalkan kesenjangan atau pemrosesan pesan duplikat saat memulai ulang pipeline. Memerlukan penentuan ID Grup Konsumen. Nilai defaultnya adalah: false.
- consumerGroupId: ID unik untuk grup konsumen tempat pipeline ini berada. Wajib jika Commit Offsets to Kafka diaktifkan. Nilai defaultnya adalah kosong.
- kafkaReadOffset: Titik awal untuk membaca pesan saat tidak ada offset yang di-commit. Yang paling awal dimulai dari awal, yang terbaru dimulai dari pesan terbaru. Nilai defaultnya adalah: latest.
- kafkaReadUsernameSecretId: ID secret Google Cloud Secret Manager yang berisi nama pengguna Kafka untuk digunakan dengan autentikasi
SASL_PLAIN
. Misalnya,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Nilai defaultnya adalah kosong. - kafkaReadPasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi Kafka untuk digunakan dengan autentikasi
SASL_PLAIN
. Misalnya,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. Nilai defaultnya adalah kosong. - kafkaReadKeystoreLocation: Jalur Google Cloud Storage ke file Java KeyStore (JKS) yang berisi sertifikat TLS dan kunci pribadi yang akan digunakan saat mengautentikasi dengan cluster Kafka. Contoh,
gs://your-bucket/keystore.jks
. - kafkaReadTruststoreLocation: Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya untuk digunakan dalam memverifikasi identitas broker Kafka.
- kafkaReadTruststorePasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi TLS Kafka. Misalnya,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeystorePasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java KeyStore (JKS) untuk autentikasi TLS Kafka. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadKeyPasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses kunci pribadi dalam file Java KeyStore (JKS) untuk autentikasi TLS Kafka. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramUsernameSecretId: ID secret Google Cloud Secret Manager yang berisi nama pengguna Kafka untuk digunakan dengan autentikasi
SASL_SCRAM
. Misalnya,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramPasswordSecretId: ID secret Google Cloud Secret Manager yang berisi sandi Kafka untuk digunakan dengan autentikasi
SASL_SCRAM
. Misalnya,projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - kafkaReadSaslScramTruststoreLocation: Jalur Google Cloud Storage ke file Java TrustStore (JKS) yang berisi sertifikat tepercaya yang akan digunakan untuk memverifikasi identitas broker Kafka.
- kafkaReadSaslScramTruststorePasswordSecretId: ID rahasia Google Cloud Secret Manager yang berisi sandi yang akan digunakan untuk mengakses file Java TrustStore (JKS) untuk autentikasi SASL_SCRAM Kafka. Misalnya,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - schemaFormat: Format skema Kafka. Dapat diberikan sebagai
SINGLE_SCHEMA_FILE
atauSCHEMA_REGISTRY
. JikaSINGLE_SCHEMA_FILE
ditentukan, gunakan skema yang disebutkan dalam file skema avro untuk semua pesan. JikaSCHEMA_REGISTRY
ditentukan, pesan dapat memiliki satu skema atau beberapa skema. Default-nya adalah: SINGLE_SCHEMA_FILE. - confluentAvroSchemaPath: Jalur Google Cloud Storage ke satu file skema Avro yang digunakan untuk mendekode semua pesan dalam topik. Nilai defaultnya adalah kosong.
- schemaRegistryConnectionUrl: URL untuk instance Confluent Schema Registry yang digunakan untuk mengelola skema Avro untuk dekode pesan. Nilai defaultnya adalah kosong.
- binaryAvroSchemaPath: Jalur Google Cloud Storage ke file skema Avro yang digunakan untuk mendekode pesan Avro yang dienkode biner. Nilai defaultnya adalah kosong.
- schemaRegistryAuthenticationMode: Mode autentikasi Schema Registry. Dapat berupa NONE, TLS, atau OAUTH. Default: NONE.
- schemaRegistryTruststoreLocation: Lokasi sertifikat SSL tempat trust store untuk autentikasi ke Schema Registry disimpan. Contoh,
/your-bucket/truststore.jks
. - schemaRegistryTruststorePasswordSecretId: SecretId di Secret Manager tempat sandi untuk mengakses secret di truststore disimpan. Contoh,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryKeystoreLocation: Lokasi keystore yang berisi sertifikat SSL dan kunci pribadi. Contoh,
/your-bucket/keystore.jks
. - schemaRegistryKeystorePasswordSecretId: SecretId di Secret Manager tempat sandi untuk mengakses file keystore. Misalnya,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryKeyPasswordSecretId: SecretId sandi yang diperlukan untuk mengakses kunci pribadi klien yang disimpan dalam keystore. Misalnya,
projects/your-project-number/secrets/your-secret-name/versions/your-secret-version
. - schemaRegistryOauthClientId: Client ID yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT.
- schemaRegistryOauthClientSecretId: ID rahasia Google Cloud Secret Manager yang berisi Rahasia Klien yang akan digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT. Contoh,
projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>
. - schemaRegistryOauthScope: Cakupan token akses yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Kolom ini bersifat opsional, karena permintaan dapat dilakukan tanpa parameter cakupan yang diteruskan. Contoh,
openid
. - schemaRegistryOauthTokenEndpointUrl: URL berbasis HTTP(S) untuk penyedia identitas OAuth/OIDC yang digunakan untuk mengautentikasi klien Schema Registry dalam mode OAUTH. Diperlukan untuk format pesan AVRO_CONFLUENT_WIRE_FORMAT.
- outputDeadletterTable: Nama tabel BigQuery yang memenuhi syarat sepenuhnya untuk pesan yang gagal. Pesan yang gagal mencapai tabel output karena berbagai alasan (misalnya, skema tidak cocok, JSON salah format) ditulis ke tabel ini. Tabel akan dibuat oleh template. Contoh,
your-project-id:your-dataset.your-table-name
.
Menjalankan template
Konsol
- Buka halaman Dataflow Create job from template. Buka Membuat tugas dari template
- Di kolom Nama tugas, masukkan nama tugas yang unik.
- Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region
default-nya adalah
us-central1
.Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.
- Dari menu drop-down Template Dataflow, pilih the Kafka to Cloud Storage template.
- Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
- Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming minimal satu kali, pilih Minimal Satu Kali.
- Klik Run job.
gcloud
Di shell atau terminal Anda, jalankan template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_Gcs_Flex \ --parameters \ readBootstrapServerAndTopic=BOOTSTRAP_SERVER_AND_TOPIC,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputDirectory=gs://STORAGE_BUCKET_NAME,\ useBigQueryDLQ=false
Ganti kode berikut:
PROJECT_ID
: ID Google Cloud project tempat Anda ingin menjalankan tugas DataflowJOB_NAME
: nama tugas unik pilihan AndaREGION_NAME
: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,us-central1
VERSION
: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latest
untuk menggunakan versi template terbaru, yang tersedia di folder induk tanpa tanggal di bucket— gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00
, untuk menggunakan versi template tertentu, yang dapat ditemukan bertingkat di folder induk yang diberi tanggal di bucket— gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
: alamat dan topik server bootstrap Apache KafkaFormat alamat dan topik server bootstrap bergantung pada jenis cluster:
- Cluster Managed Service for Apache Kafka:
projects/PROJECT_ID/locations/REGION_NAME/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka eksternal:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service for Apache Kafka:
STORAGE_BUCKET_NAME
: bucket Cloud Storage tempat output ditulis
API
Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk mengetahui informasi selengkapnya tentang
API dan cakupan otorisasinya, lihat
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "readBootstrapServerAndTopic": "BOOTSTRAP_SERVER_AND_TOPIC", "kafkaReadAuthenticationMode": "APPLICATION_DEFAULT_CREDENTIALS", "messageFormat": "JSON", "outputDirectory": "gs://STORAGE_BUCKET_NAME", "useBigQueryDLQ": "false" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_Gcs_Flex", } }
Ganti kode berikut:
PROJECT_ID
: ID Google Cloud project tempat Anda ingin menjalankan tugas DataflowJOB_NAME
: nama tugas unik pilihan AndaLOCATION
: region tempat Anda ingin men-deploy tugas Dataflow—misalnya,us-central1
VERSION
: versi template yang ingin Anda gunakanAnda dapat menggunakan nilai berikut:
latest
untuk menggunakan versi template terbaru, yang tersedia di folder induk tanpa tanggal di bucket— gs://dataflow-templates-REGION_NAME/latest/- nama versi, seperti
2023-09-12-00_RC00
, untuk menggunakan versi template tertentu, yang dapat ditemukan bertingkat di folder induk yang diberi tanggal di bucket— gs://dataflow-templates-REGION_NAME/
BOOTSTRAP_SERVER_AND_TOPIC
: alamat dan topik server bootstrap Apache KafkaFormat alamat dan topik server bootstrap bergantung pada jenis cluster:
- Cluster Managed Service for Apache Kafka:
projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_NAME/topics/TOPIC_NAME
- Cluster Kafka eksternal:
BOOTSTRAP_SERVER_ADDRESS;TOPIC_NAME
- Cluster Managed Service for Apache Kafka:
STORAGE_BUCKET_NAME
: bucket Cloud Storage tempat output ditulis
Langkah berikutnya
- Pelajari template Dataflow.
- Lihat daftar template yang disediakan Google.