Template Apache Kafka ke BigQuery

Template Apache Kafka ke BigQuery adalah pipeline streaming yang menyerap data teks dari Apache Kafka, menjalankan fungsi yang ditentukan pengguna (UDF), dan menghasilkan kumpulan data yang dihasilkan ke BigQuery. Setiap error yang terjadi dalam transformasi data, eksekusi UDF, atau penyisipan ke dalam tabel output akan disisipkan ke tabel error terpisah di BigQuery. Jika tabel error tidak ada sebelum dieksekusi, maka tabel tersebut akan dibuat.

Persyaratan pipeline

  • Tabel BigQuery output harus ada.
  • Server broker Apache Kafka harus berjalan dan dapat dijangkau dari mesin pekerja Dataflow.
  • Topik Apache Kafka harus ada dan pesan harus dienkode dalam format JSON yang valid.

Parameter template

Parameter Deskripsi
outputTableSpec Lokasi tabel output BigQuery yang menjadi tujuan penulisan pesan Apache Kafka, dalam format my-project:dataset.table
inputTopics Topik input Apache Kafka yang akan dibaca dalam daftar yang dipisahkan koma. Contoh: messages
bootstrapServers Alamat host server broker Apache Kafka yang berjalan dalam daftar yang dipisahkan koma, setiap alamat host dalam format 35.70.252.199:9092
javascriptTextTransformGcsPath Opsional: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan. Misalnya, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName Opsional: Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsi adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.
javascriptTextTransformReloadIntervalMinutes Opsional: Menentukan seberapa sering harus memuat ulang UDF, dalam hitungan menit. Jika nilainya lebih besar dari 0, Dataflow secara berkala akan memeriksa file UDF di Cloud Storage, dan memuat ulang UDF jika file diubah. Parameter ini memungkinkan Anda memperbarui UDF saat pipeline berjalan, tanpa perlu memulai ulang tugas. Jika nilainya adalah 0, pemuatan ulang UDF akan dinonaktifkan. Nilai defaultnya adalah 0.
outputDeadletterTable Opsional: Tabel BigQuery untuk pesan yang gagal mencapai tabel output, dalam format my-project:dataset.my-deadletter-table. Jika tidak ada, tabel akan dibuat selama eksekusi pipeline. Jika tidak ditentukan, <outputTableSpec>_error_records akan digunakan.
useStorageWriteApi Opsional: Jika true, pipeline akan menggunakan BigQuery Storage Write API. Nilai defaultnya adalah false. Untuk informasi selengkapnya, lihat Menggunakan Storage Write API.
useStorageWriteApiAtLeastOnce Opsional: Saat menggunakan Storage Write API, menentukan semantik penulisan. Untuk menggunakan semantik minimal satu kali, tetapkan parameter ini ke true. Untuk menggunakan semantik tepat satu kali, tetapkan parameter ke false. Parameter ini hanya berlaku jika useStorageWriteApi adalah true. Nilai defaultnya adalah false.
numStorageWriteApiStreams Opsional: Menentukan jumlah aliran operasi tulis saat menggunakan Storage Write API. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.
storageWriteApiTriggeringFrequencySec Opsional: Saat menggunakan Storage Write API, menentukan frekuensi pemicu, dalam hitungan detik. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.

Fungsi yang ditentukan pengguna

Anda juga dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template memanggil UDF untuk setiap elemen input. Payload elemen diserialisasi sebagai string JSON. Untuk mengetahui informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: nilai data Kafka, yang diserialisasi sebagai string JSON.
  • Output: string JSON yang cocok dengan skema tabel tujuan BigQuery.

Menjalankan template

Konsol

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Job name, masukkan nama pekerjaan yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

    Untuk daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Kafka to BigQuery template.
  6. Di kolom parameter yang disediakan, masukkan parameter value Anda.
  7. Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming minimal satu kali, pilih Minimal Sekali.
  8. Klik Run job.

gcloud

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Kafka_to_BigQuery \
    --parameters \
outputTableSpec=BIGQUERY_TABLE,\
inputTopics=KAFKA_TOPICS,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
bootstrapServers=KAFKA_SERVER_ADDRESSES
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • BIGQUERY_TABLE: nama tabel BigQuery Anda
  • KAFKA_TOPICS: daftar topik Apache Kakfa. Jika ada beberapa topik yang diberikan, ikuti instructions tentang cara meng-escape koma.
  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsi adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • KAFKA_SERVER_ADDRESSES: daftar alamat IP server broker Apache Kafka. Setiap alamat IP harus memiliki nomor port yang dapat diakses oleh server. Contoh: 35.70.252.199:9092. Jika ada beberapa alamat yang diberikan, ikuti instructions tentang cara meng-escape koma.

API

Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "outputTableSpec": "BIGQUERY_TABLE",
          "inputTopics": "KAFKA_TOPICS",
          "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
          "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
          "bootstrapServers": "KAFKA_SERVER_ADDRESSES"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Kafka_to_BigQuery",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama pekerjaan unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow, misalnya us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • BIGQUERY_TABLE: nama tabel BigQuery Anda
  • KAFKA_TOPICS: daftar topik Apache Kakfa. Jika ada beberapa topik yang diberikan, ikuti instructions tentang cara meng-escape koma.
  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsi adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • KAFKA_SERVER_ADDRESSES: daftar alamat IP server broker Apache Kafka. Setiap alamat IP harus memiliki nomor port yang dapat diakses oleh server. Contoh: 35.70.252.199:9092. Jika ada beberapa alamat yang diberikan, ikuti instructions tentang cara meng-escape koma.

Untuk mengetahui informasi selengkapnya, lihat Menulis data dari Kafka ke BigQuery dengan Dataflow.

Langkah selanjutnya