Halaman ini menunjukkan cara menggunakan Dataflow untuk membaca data dari Google Cloud Managed Service for Apache Kafka dan menulis rekaman ke tabel BigQuery. Tutorial ini menggunakan template Apache Kafka ke BigQuery untuk membuat tugas Dataflow.
Ringkasan
Apache Kafka adalah platform open source untuk streaming peristiwa. Kafka biasanya digunakan dalam arsitektur terdistribusi untuk memungkinkan komunikasi antar-komponen yang tidak terikat erat. Anda dapat menggunakan Dataflow untuk membaca peristiwa dari Kafka, memprosesnya, dan menulis hasilnya ke tabel BigQuery untuk analisis lebih lanjut.
Managed Service for Apache Kafka adalah layanan yang membantu Anda menjalankan cluster Kafka yang aman dan skalabel. Google Cloud

Izin yang diperlukan
Akun layanan pekerja Dataflow harus memiliki peran Identity and Access Management (IAM) berikut:
- Klien Kafka Terkelola (
roles/managedkafka.client
) - BigQuery Data Editor (
roles/bigquery.dataEditor
)
Untuk mengetahui informasi selengkapnya, lihat Keamanan dan izin Dataflow.
Membuat cluster Kafka
Pada langkah ini, Anda akan membuat cluster Managed Service for Apache Kafka. Untuk mengetahui informasi selengkapnya, lihat Membuat cluster Managed Service for Apache Kafka.
Konsol
Buka halaman Managed Service for Apache Kafka > Clusters.
Klik Buat.
Di kotak Nama cluster, masukkan nama untuk cluster.
Dalam daftar Region, pilih lokasi untuk cluster.
Klik Buat.
gcloud
Gunakan
perintah managed-kafka clusters create
.
gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME
Ganti kode berikut:
CLUSTER
: nama untuk clusterREGION
: region tempat Anda membuat subnetPROJECT_ID
: project ID AndaSUBNET_NAME
: subnet tempat Anda ingin men-deploy cluster
Pembuatan cluster biasanya memerlukan waktu 20-30 menit.
Membuat topik Kafka
Setelah cluster Managed Service for Apache Kafka dibuat, buat topik.
Konsol
Buka halaman Managed Service for Apache Kafka > Clusters.
Klik nama cluster.
Di halaman detail cluster, klik Create Topic.
Di kotak Nama topik, masukkan nama untuk topik.
Klik Buat.
gcloud
Gunakan
perintah managed-kafka topics create
.
gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3
Ganti kode berikut:
TOPIC_NAME
: nama topik yang akan dibuat
Membuat tabel BigQuery
Pada langkah ini, Anda akan membuat tabel BigQuery dengan skema berikut:
Nama kolom | Jenis data |
---|---|
name |
STRING |
customer_id |
INTEGER |
Jika Anda belum memiliki set data BigQuery, buat set data terlebih dahulu. Untuk mengetahui informasi selengkapnya, lihat Membuat set data. Kemudian, buat tabel kosong baru:
Konsol
Buka halaman BigQuery.
Di panel Explorer, luaskan project Anda, lalu pilih set data.
Di bagian info Dataset, klik
Buat tabel.Dalam daftar Create table from, pilih Empty table.
Di kotak Table, masukkan nama tabel.
Di bagian Schema, klik Edit sebagai teks.
Tempel definisi skema berikut:
name:STRING, customer_id:INTEGER
Klik Create table.
gcloud
Gunakan perintah bq mk
.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
Ganti kode berikut:
PROJECT_ID
: project ID AndaDATASET_NAME
: nama set dataTABLE_NAME
: nama tabel yang akan dibuat
Menjalankan tugas Dataflow
Setelah membuat cluster Kafka dan tabel BigQuery, jalankan template Dataflow.
Konsol
Pertama, dapatkan alamat server bootstrap cluster:
Di konsol Google Cloud , buka halaman Clusters.
Klik nama cluster.
Klik tab Konfigurasi.
Salin alamat server bootstrap dari Bootstrap URL.
Selanjutnya, jalankan template untuk membuat tugas Dataflow:
Buka halaman Dataflow > Jobs.
Klik Buat tugas dari template.
Di kolom Job Name, masukkan
kafka-to-bq
.Untuk Regional endpoint, pilih region tempat cluster Managed Service for Apache Kafka Anda berada.
Pilih template "Kafka to BigQuery".
Masukkan parameter template berikut:
- Server bootstrap Kafka: alamat server bootstrap
- Topik Kafka sumber: nama topik yang akan dibaca
- Mode autentikasi sumber Kafka:
APPLICATION_DEFAULT_CREDENTIALS
- Format pesan Kafka:
JSON
- Strategi penamaan tabel:
SINGLE_TABLE_NAME
- Tabel output BigQuery: Tabel BigQuery, diformat
sebagai berikut:
PROJECT_ID
:DATASET_NAME
.TABLE_NAME
Di bagian Antrean pesan yang tidak terkirim, centang Tulis error ke BigQuery.
Masukkan nama tabel BigQuery untuk antrean pesan yang tidak terkirim, yang diformat sebagai berikut:
PROJECT_ID
:DATASET_NAME
.ERROR_TABLE_NAME
Jangan membuat tabel ini sebelumnya. Pipeline akan membuatnya.
Klik Run job.
gcloud
Gunakan
perintah dataflow flex-template run
.
gcloud dataflow flex-template run kafka-to-bq \ --template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \ --region LOCATION \ --parameters \ readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\ persistKafkaKey=false,\ writeMode=SINGLE_TABLE_NAME,\ kafkaReadOffset=earliest,\ kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\ messageFormat=JSON,\ outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\ useBigQueryDLQ=true,\ outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME
Ganti variabel berikut:
LOCATION
: region tempat Managed Service for Apache Kafka Anda beradaPROJECT_ID
: nama Google Cloud project AndaCLUSTER_ID
: clusterTOPIC
: nama topik KafkaDATASET_NAME
: nama set dataTABLE_NAME
: nama tabelERROR_TABLE_NAME
: nama tabel BigQuery untuk antrean pesan yang tidak terkirim
Jangan membuat tabel untuk antrean pesan yang tidak terkirim sebelumnya. Pipeline akan membuatnya.
Mengirim pesan ke Kafka
Setelah tugas Dataflow dimulai, Anda dapat mengirim pesan ke Kafka, dan pipeline akan menuliskannya ke BigQuery.
Buat VM di subnet yang sama dengan cluster Kafka dan instal alat command line Kafka. Untuk mendapatkan petunjuk mendetail, lihat Menyiapkan mesin klien di Memublikasikan dan menggunakan pesan dengan CLI.
Jalankan perintah berikut untuk menulis pesan ke topik Kafka:
kafka-console-producer.sh \ --topic TOPIC \ --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \ --producer.config client.properties
Ganti variabel berikut:
TOPIC
: nama topik KafkaCLUSTER_ID
: nama clusterLOCATION
: region tempat cluster Anda beradaPROJECT_ID
: nama Google Cloud project Anda
Di perintah, masukkan baris teks berikut untuk mengirim pesan ke Kafka:
{"name": "Alice", "customer_id": 1} {"name": "Bob", "customer_id": 2} {"name": "Charles", "customer_id": 3}
Menggunakan antrean pesan yang dihentikan pengirimannya
Saat tugas berjalan, pipeline mungkin gagal menulis setiap pesan ke BigQuery. Kemungkinan error meliputi:
- Error serialisasi, termasuk JSON yang diformat dengan buruk.
- Error konversi jenis, yang disebabkan oleh ketidakcocokan dalam skema tabel dan data JSON.
- Kolom tambahan dalam data JSON yang tidak ada dalam skema tabel.
Error ini tidak menyebabkan tugas gagal, dan tidak muncul sebagai error di log tugas Dataflow. Sebagai gantinya, pipeline menggunakan antrean surat yang tidak terkirim untuk menangani jenis error ini.
Untuk mengaktifkan antrean pesan yang tidak terkirim saat Anda menjalankan template, tetapkan parameter template berikut:
useBigQueryDLQ
:true
outputDeadletterTable
: nama tabel BigQuery yang sepenuhnya memenuhi syarat; misalnya,my-project:dataset1.errors
Pipeline akan otomatis membuat tabel. Jika terjadi error saat memproses pesan Kafka, pipeline akan menulis entri error ke tabel.
Contoh pesan error:
Jenis error | Data peristiwa | errorMessage |
---|---|---|
Error serialisasi | "Hello world" | Gagal melakukan serialisasi json ke baris tabel: "Hello world" |
Error konversi jenis | {"name":"Emily","customer_id":"abc"} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 } |
Kolom tidak dikenal | {"name":"Zoe","age":34} | { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 } |
Bekerja dengan jenis data BigQuery
Secara internal, konektor I/O Kafka mengonversi payload pesan JSON menjadi objek TableRow
Apache Beam, dan menerjemahkan nilai kolom TableRow
menjadi jenis BigQuery.
Tabel berikut menunjukkan representasi JSON dari jenis data BigQuery.
Jenis BigQuery | Representasi JSON |
---|---|
ARRAY |
[1.2,3] |
BOOL |
true |
DATE |
"2022-07-01" |
DATETIME |
"2022-07-01 12:00:00.00" |
DECIMAL |
5.2E11 |
FLOAT64 |
3.142 |
GEOGRAPHY |
"POINT(1 2)" Tentukan geografi menggunakan well-known text (WKT) atau GeoJSON, yang diformat sebagai string. Untuk mengetahui informasi selengkapnya, lihat Memuat data geospasial. |
INT64 |
10 |
INTERVAL |
"0-13 370 48:61:61" |
STRING |
"string_val" |
TIMESTAMP |
"2022-07-01T12:00:00.00Z" Gunakan metode |
Data terstruktur
Jika pesan JSON Anda mengikuti skema yang konsisten, Anda dapat merepresentasikan objek JSON menggunakan jenis data STRUCT
di BigQuery.
Dalam contoh berikut, kolom answers
adalah objek JSON dengan dua subkolom, a
dan b
:
{"name":"Emily","answers":{"a":"yes","b":"no"}}
Pernyataan SQL berikut membuat tabel BigQuery dengan skema yang kompatibel:
CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);
Tabel yang dihasilkan akan terlihat seperti berikut:
+-------+----------------------+
| name | answers |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+
Data semi-terstruktur
Jika pesan JSON Anda tidak mengikuti skema yang ketat, pertimbangkan untuk menyimpannya di BigQuery sebagai jenis data JSON
.
Dengan menyimpan data JSON sebagai jenis JSON
, Anda tidak perlu menentukan skema di awal. Setelah penyerapan data, Anda dapat membuat kueri data menggunakan operator akses kolom (notasi titik) dan akses array di GoogleSQL. Untuk mengetahui informasi selengkapnya, lihat Bekerja dengan data JSON di GoogleSQL.
Menggunakan UDF untuk mengubah data
Tutorial ini mengasumsikan bahwa pesan Kafka diformat sebagai JSON, dan skema tabel BigQuery cocok dengan data JSON, tanpa transformasi yang diterapkan pada data.
Secara opsional, Anda dapat memberikan fungsi yang ditentukan pengguna (UDF) JavaScript yang mentransformasi data sebelum ditulis ke BigQuery. UDF juga dapat melakukan pemrosesan tambahan, seperti memfilter, menghapus informasi identitas pribadi (PII), atau memperkaya data dengan kolom tambahan.
Untuk mengetahui informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.