Menulis data dari Kafka ke BigQuery menggunakan Dataflow

Halaman ini menunjukkan cara menggunakan Dataflow untuk membaca data dari Google Cloud Managed Service for Apache Kafka dan menulis rekaman ke tabel BigQuery. Tutorial ini menggunakan template Apache Kafka ke BigQuery untuk membuat tugas Dataflow.

Ringkasan

Apache Kafka adalah platform open source untuk streaming peristiwa. Kafka biasanya digunakan dalam arsitektur terdistribusi untuk memungkinkan komunikasi antar-komponen yang tidak terikat erat. Anda dapat menggunakan Dataflow untuk membaca peristiwa dari Kafka, memprosesnya, dan menulis hasilnya ke tabel BigQuery untuk analisis lebih lanjut.

Managed Service for Apache Kafka adalah layanan yang membantu Anda menjalankan cluster Kafka yang aman dan skalabel. Google Cloud

Membaca peristiwa Kafka ke BigQuery
Arsitektur berbasis peristiwa menggunakan Apache Kafka

Izin yang diperlukan

Akun layanan pekerja Dataflow harus memiliki peran Identity and Access Management (IAM) berikut:

  • Klien Kafka Terkelola (roles/managedkafka.client)
  • BigQuery Data Editor (roles/bigquery.dataEditor)

Untuk mengetahui informasi selengkapnya, lihat Keamanan dan izin Dataflow.

Membuat cluster Kafka

Pada langkah ini, Anda akan membuat cluster Managed Service for Apache Kafka. Untuk mengetahui informasi selengkapnya, lihat Membuat cluster Managed Service for Apache Kafka.

Konsol

  1. Buka halaman Managed Service for Apache Kafka > Clusters.

    Buka Cluster

  2. Klik Buat.

  3. Di kotak Nama cluster, masukkan nama untuk cluster.

  4. Dalam daftar Region, pilih lokasi untuk cluster.

  5. Klik Buat.

gcloud

Gunakan perintah managed-kafka clusters create.

gcloud managed-kafka clusters create CLUSTER \
--location=REGION \
--cpu=3 \
--memory=3GiB \
--subnets=projects/PROJECT_ID/regions/REGION/subnetworks/SUBNET_NAME

Ganti kode berikut:

  • CLUSTER: nama untuk cluster
  • REGION: region tempat Anda membuat subnet
  • PROJECT_ID: project ID Anda
  • SUBNET_NAME: subnet tempat Anda ingin men-deploy cluster

Pembuatan cluster biasanya memerlukan waktu 20-30 menit.

Membuat topik Kafka

Setelah cluster Managed Service for Apache Kafka dibuat, buat topik.

Konsol

  1. Buka halaman Managed Service for Apache Kafka > Clusters.

    Buka Cluster

  2. Klik nama cluster.

  3. Di halaman detail cluster, klik Create Topic.

  4. Di kotak Nama topik, masukkan nama untuk topik.

  5. Klik Buat.

gcloud

Gunakan perintah managed-kafka topics create.

gcloud managed-kafka topics create TOPIC_NAME \
--cluster=CLUSTER \
--location=REGION \
--partitions=10 \
--replication-factor=3

Ganti kode berikut:

  • TOPIC_NAME: nama topik yang akan dibuat

Membuat tabel BigQuery

Pada langkah ini, Anda akan membuat tabel BigQuery dengan skema berikut:

Nama kolom Jenis data
name STRING
customer_id INTEGER

Jika Anda belum memiliki set data BigQuery, buat set data terlebih dahulu. Untuk mengetahui informasi selengkapnya, lihat Membuat set data. Kemudian, buat tabel kosong baru:

Konsol

  1. Buka halaman BigQuery.

    Buka BigQuery

  2. Di panel Explorer, luaskan project Anda, lalu pilih set data.

  3. Di bagian info Dataset, klik Buat tabel.

  4. Dalam daftar Create table from, pilih Empty table.

  5. Di kotak Table, masukkan nama tabel.

  6. Di bagian Schema, klik Edit sebagai teks.

  7. Tempel definisi skema berikut:

    name:STRING,
    customer_id:INTEGER
    
  8. Klik Create table.

gcloud

Gunakan perintah bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Ganti kode berikut:

  • PROJECT_ID: project ID Anda
  • DATASET_NAME: nama set data
  • TABLE_NAME: nama tabel yang akan dibuat

Menjalankan tugas Dataflow

Setelah membuat cluster Kafka dan tabel BigQuery, jalankan template Dataflow.

Konsol

Pertama, dapatkan alamat server bootstrap cluster:

  1. Di konsol Google Cloud , buka halaman Clusters.

    Buka Cluster

  2. Klik nama cluster.

  3. Klik tab Konfigurasi.

  4. Salin alamat server bootstrap dari Bootstrap URL.

Selanjutnya, jalankan template untuk membuat tugas Dataflow:

  1. Buka halaman Dataflow > Jobs.

    Buka Tugas

  2. Klik Buat tugas dari template.

  3. Di kolom Job Name, masukkan kafka-to-bq.

  4. Untuk Regional endpoint, pilih region tempat cluster Managed Service for Apache Kafka Anda berada.

  5. Pilih template "Kafka to BigQuery".

  6. Masukkan parameter template berikut:

    • Server bootstrap Kafka: alamat server bootstrap
    • Topik Kafka sumber: nama topik yang akan dibaca
    • Mode autentikasi sumber Kafka: APPLICATION_DEFAULT_CREDENTIALS
    • Format pesan Kafka: JSON
    • Strategi penamaan tabel: SINGLE_TABLE_NAME
    • Tabel output BigQuery: Tabel BigQuery, diformat sebagai berikut: PROJECT_ID:DATASET_NAME.TABLE_NAME
  7. Di bagian Antrean pesan yang tidak terkirim, centang Tulis error ke BigQuery.

  8. Masukkan nama tabel BigQuery untuk antrean pesan yang tidak terkirim, yang diformat sebagai berikut: PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

    Jangan membuat tabel ini sebelumnya. Pipeline akan membuatnya.

  9. Klik Run job.

gcloud

Gunakan perintah dataflow flex-template run.

gcloud dataflow flex-template run kafka-to-bq \
--template-file-gcs-location gs://dataflow-templates/latest/flex/Kafka_to_BigQuery \
--region LOCATION \
--parameters \
readBootstrapServerAndTopic=projects/PROJECT_ID/locations/LOCATION/clusters/CLUSTER_ID/topics/TOPIC,\
persistKafkaKey=false,\
writeMode=SINGLE_TABLE_NAME,\
kafkaReadOffset=earliest,\
kafkaReadAuthenticationMode=APPLICATION_DEFAULT_CREDENTIALS,\
messageFormat=JSON,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME\
useBigQueryDLQ=true,\
outputDeadletterTable=PROJECT_ID:DATASET_NAME.ERROR_TABLE_NAME

Ganti variabel berikut:

  • LOCATION: region tempat Managed Service for Apache Kafka Anda berada
  • PROJECT_ID: nama Google Cloud project Anda
  • CLUSTER_ID: cluster
  • TOPIC: nama topik Kafka
  • DATASET_NAME: nama set data
  • TABLE_NAME: nama tabel
  • ERROR_TABLE_NAME: nama tabel BigQuery untuk antrean pesan yang tidak terkirim

Jangan membuat tabel untuk antrean pesan yang tidak terkirim sebelumnya. Pipeline akan membuatnya.

Mengirim pesan ke Kafka

Setelah tugas Dataflow dimulai, Anda dapat mengirim pesan ke Kafka, dan pipeline akan menuliskannya ke BigQuery.

  1. Buat VM di subnet yang sama dengan cluster Kafka dan instal alat command line Kafka. Untuk mendapatkan petunjuk mendetail, lihat Menyiapkan mesin klien di Memublikasikan dan menggunakan pesan dengan CLI.

  2. Jalankan perintah berikut untuk menulis pesan ke topik Kafka:

    kafka-console-producer.sh \
     --topic TOPIC \
     --bootstrap-server bootstrap.CLUSTER_ID.LOCATION.managedkafka.PROJECT_ID.cloud.goog:9092 \
     --producer.config client.properties

    Ganti variabel berikut:

    • TOPIC: nama topik Kafka
    • CLUSTER_ID: nama cluster
    • LOCATION: region tempat cluster Anda berada
    • PROJECT_ID: nama Google Cloud project Anda
  3. Di perintah, masukkan baris teks berikut untuk mengirim pesan ke Kafka:

    {"name": "Alice", "customer_id": 1}
    {"name": "Bob", "customer_id": 2}
    {"name": "Charles", "customer_id": 3}
    

Menggunakan antrean pesan yang dihentikan pengirimannya

Saat tugas berjalan, pipeline mungkin gagal menulis setiap pesan ke BigQuery. Kemungkinan error meliputi:

  • Error serialisasi, termasuk JSON yang diformat dengan buruk.
  • Error konversi jenis, yang disebabkan oleh ketidakcocokan dalam skema tabel dan data JSON.
  • Kolom tambahan dalam data JSON yang tidak ada dalam skema tabel.

Error ini tidak menyebabkan tugas gagal, dan tidak muncul sebagai error di log tugas Dataflow. Sebagai gantinya, pipeline menggunakan antrean surat yang tidak terkirim untuk menangani jenis error ini.

Untuk mengaktifkan antrean pesan yang tidak terkirim saat Anda menjalankan template, tetapkan parameter template berikut:

  • useBigQueryDLQ: true
  • outputDeadletterTable: nama tabel BigQuery yang sepenuhnya memenuhi syarat; misalnya, my-project:dataset1.errors

Pipeline akan otomatis membuat tabel. Jika terjadi error saat memproses pesan Kafka, pipeline akan menulis entri error ke tabel.

Contoh pesan error:

Jenis error Data peristiwa errorMessage
Error serialisasi "Hello world" Gagal melakukan serialisasi json ke baris tabel: "Hello world"
Error konversi jenis {"name":"Emily","customer_id":"abc"} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "Cannot convert value to integer (bad value): abc", "reason" : "invalid" } ], "index" : 0 }
Kolom tidak dikenal {"name":"Zoe","age":34} { "errors" : [ { "debugInfo" : "", "location" : "age", "message" : "no such field: customer_id.", "reason" : "invalid" } ], "index" : 0 }

Bekerja dengan jenis data BigQuery

Secara internal, konektor I/O Kafka mengonversi payload pesan JSON menjadi objek TableRow Apache Beam, dan menerjemahkan nilai kolom TableRow menjadi jenis BigQuery.

Tabel berikut menunjukkan representasi JSON dari jenis data BigQuery.

Jenis BigQuery Representasi JSON
ARRAY [1.2,3]
BOOL true
DATE "2022-07-01"
DATETIME "2022-07-01 12:00:00.00"
DECIMAL 5.2E11
FLOAT64 3.142
GEOGRAPHY "POINT(1 2)"

Tentukan geografi menggunakan well-known text (WKT) atau GeoJSON, yang diformat sebagai string. Untuk mengetahui informasi selengkapnya, lihat Memuat data geospasial.

INT64 10
INTERVAL "0-13 370 48:61:61"
STRING "string_val"
TIMESTAMP "2022-07-01T12:00:00.00Z"

Gunakan metode Date.toJSON JavaScript untuk memformat nilai.

Data terstruktur

Jika pesan JSON Anda mengikuti skema yang konsisten, Anda dapat merepresentasikan objek JSON menggunakan jenis data STRUCT di BigQuery.

Dalam contoh berikut, kolom answers adalah objek JSON dengan dua subkolom, a dan b:

{"name":"Emily","answers":{"a":"yes","b":"no"}}

Pernyataan SQL berikut membuat tabel BigQuery dengan skema yang kompatibel:

CREATE TABLE my_dataset.kafka_events (name STRING, answers STRUCT<a STRING, b STRING>);

Tabel yang dihasilkan akan terlihat seperti berikut:

+-------+----------------------+
| name  |       answers        |
+-------+----------------------+
| Emily | {"a":"yes","b":"no"} |
+-------+----------------------+

Data semi-terstruktur

Jika pesan JSON Anda tidak mengikuti skema yang ketat, pertimbangkan untuk menyimpannya di BigQuery sebagai jenis data JSON. Dengan menyimpan data JSON sebagai jenis JSON, Anda tidak perlu menentukan skema di awal. Setelah penyerapan data, Anda dapat membuat kueri data menggunakan operator akses kolom (notasi titik) dan akses array di GoogleSQL. Untuk mengetahui informasi selengkapnya, lihat Bekerja dengan data JSON di GoogleSQL.

Menggunakan UDF untuk mengubah data

Tutorial ini mengasumsikan bahwa pesan Kafka diformat sebagai JSON, dan skema tabel BigQuery cocok dengan data JSON, tanpa transformasi yang diterapkan pada data.

Secara opsional, Anda dapat memberikan fungsi yang ditentukan pengguna (UDF) JavaScript yang mentransformasi data sebelum ditulis ke BigQuery. UDF juga dapat melakukan pemrosesan tambahan, seperti memfilter, menghapus informasi identitas pribadi (PII), atau memperkaya data dengan kolom tambahan.

Untuk mengetahui informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Langkah berikutnya