Streaming dari Pub/Sub ke BigQuery


Tutorial ini menggunakan template Pub/Sub Subscription to BigQuery untuk membuat dan menjalankan tugas template Dataflow menggunakan konsol atau Google Cloud CLI. Google Cloud Tutorial ini memandu Anda melalui contoh pipeline streaming yang membaca pesan berenkode JSON dari Pub/Sub dan menuliskannya ke tabel BigQuery.

Pipeline analisis streaming dan integrasi data menggunakan Pub/Sub untuk menyerap dan mendistribusikan data. Pub/Sub memungkinkan Anda membuat sistem produsen dan konsumen peristiwa, yang disebut penayang dan pelanggan. Penayang mengirimkan peristiwa ke layanan Pub/Sub secara asinkron, dan Pub/Sub mengirimkan peristiwa tersebut ke semua layanan yang perlu meresponsnya.

Dataflow adalah layanan terkelola sepenuhnya untuk mengubah dan memperkaya data dalam mode streaming (real-time) dan batch. Layanan ini menyediakan lingkungan pengembangan pipeline yang disederhanakan yang menggunakan Apache Beam SDK untuk mengubah data yang masuk, lalu menghasilkan data yang telah diubah.

Manfaat alur kerja ini adalah Anda dapat menggunakan UDF untuk mentransformasi data pesan sebelum ditulis ke BigQuery.

Sebelum menjalankan pipeline Dataflow untuk skenario ini, pertimbangkan apakah langganan Pub/Sub BigQuery dengan UDF memenuhi persyaratan Anda.

Tujuan

  • Buat topik Pub/Sub.
  • Buat set data BigQuery dengan tabel dan skema.
  • Gunakan template streaming yang disediakan Google untuk melakukan streaming data dari langganan Pub/Sub ke BigQuery menggunakan Dataflow.

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • BigQuery

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.

Sebelum memulai

Bagian ini menunjukkan cara memilih project, mengaktifkan API, dan memberikan peran yang sesuai ke akun pengguna Anda dan ke akun layanan pekerja.

Konsol

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.

    Enable the APIs

  8. Untuk menyelesaikan langkah-langkah dalam tutorial ini, akun pengguna Anda harus memiliki peran Pengguna Akun Layanan. Akun layanan default Compute Engine harus memiliki peran berikut: Dataflow Worker, Dataflow Admin, Pub/Sub Editor, Storage Object Admin, dan BigQuery Data Editor. Untuk menambahkan peran yang diperlukan di konsol Google Cloud :

    1. Di konsol Google Cloud , buka halaman IAM.

      Buka IAM
    2. Pilih project Anda.
    3. Di baris yang berisi akun pengguna Anda, klik Edit akun utama, lalu klik Tambahkan peran lain.
    4. Di menu drop-down, pilih peran Service Account User.
    5. Di baris yang berisi akun layanan default Compute Engine, klik Edit akun utama, lalu klik Tambahkan peran lain.
    6. Di menu drop-down, pilih peran Dataflow Worker.
    7. Ulangi untuk peran Dataflow Admin, Pub/Sub Editor, Storage Object Admin, dan BigQuery Data Editor, lalu klik Save.

      Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Memberikan peran IAM menggunakan konsol.

gcloud

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  8. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs:

    gcloud services enable compute.googleapis.com dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com
  16. If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. Beri peran ke akun layanan default Compute Engine Anda. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE

    Ganti kode berikut:

    • PROJECT_ID: project ID Anda.
    • PROJECT_NUMBER: nomor project Anda. Untuk menemukan nomor project Anda, gunakan perintah gcloud projects describe.
    • SERVICE_ACCOUNT_ROLE: setiap peran individual.

Membuat bucket Cloud Storage

Mulai dengan membuat bucket Cloud Storage menggunakan Google Cloud konsol atau Google Cloud CLI. Pipeline Dataflow menggunakan bucket ini sebagai lokasi penyimpanan sementara.

Konsol

  1. Di Google Cloud konsol, buka halaman Bucket Cloud Storage.

    Buka Buckets

  2. Klik Buat.

  3. Di halaman Buat bucket, untuk Beri nama bucket Anda, masukkan nama yang memenuhi persyaratan penamaan bucket. Nama bucket Cloud Storage harus unik secara global. Jangan pilih opsi lainnya.

  4. Klik Buat.

gcloud

Gunakan perintah gcloud storage buckets create:

gcloud storage buckets create gs://BUCKET_NAME

Ganti BUCKET_NAME dengan nama untuk bucket Cloud Storage Anda yang memenuhi persyaratan penamaan bucket. Nama bucket Cloud Storage harus unik secara global.

Membuat topik dan langganan Pub/Sub

Buat topik Pub/Sub, lalu buat langganan ke topik tersebut.

Konsol

Untuk membuat topik, selesaikan langkah-langkah berikut.

  1. Di konsol Google Cloud , buka halaman Topics Pub/Sub.

    Buka Topik

  2. Klik Create topic.

  3. Di kolom Topic ID, masukkan ID untuk topik Anda. Untuk informasi tentang cara memberi nama topik, lihat Panduan untuk memberi nama topik atau langganan.

  4. Tetapkan opsi Tambahkan langganan default. Jangan pilih opsi lainnya.

  5. Klik Buat.

  6. Di halaman detail topik, nama langganan yang dibuat dicantumkan di bagian ID Langganan. Catat nilai ini untuk langkah-langkah selanjutnya.

gcloud

Untuk membuat topik, jalankan perintah gcloud pubsub topics create. Untuk mengetahui informasi tentang cara memberi nama langganan, lihat Panduan untuk memberi nama topik atau langganan.

gcloud pubsub topics create TOPIC_ID

Ganti TOPIC_ID dengan nama untuk topik Pub/Sub Anda.

Untuk membuat langganan ke topik Anda, jalankan perintah gcloud pubsub subscriptions create:

gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID

Ganti SUBSCRIPTION_ID dengan nama untuk langganan Pub/Sub Anda.

Membuat tabel BigQuery

Pada langkah ini, Anda akan membuat tabel BigQuery dengan skema berikut:

Nama kolom Jenis data
name STRING
customer_id INTEGER

Jika Anda belum memiliki set data BigQuery, buat set data terlebih dahulu. Untuk mengetahui informasi selengkapnya, lihat Membuat set data. Kemudian, buat tabel kosong baru:

Konsol

  1. Buka halaman BigQuery.

    Buka BigQuery

  2. Di panel Explorer, luaskan project Anda, lalu pilih set data.

  3. Di bagian info Dataset, klik Buat tabel.

  4. Dalam daftar Create table from, pilih Empty table.

  5. Di kotak Table, masukkan nama tabel.

  6. Di bagian Schema, klik Edit sebagai teks.

  7. Tempel definisi skema berikut:

    name:STRING,
    customer_id:INTEGER
    
  8. Klik Create table.

gcloud

Gunakan perintah bq mk.

bq mk --table \
  PROJECT_ID:DATASET_NAME.TABLE_NAME \
  name:STRING,customer_id:INTEGER

Ganti kode berikut:

  • PROJECT_ID: project ID Anda
  • DATASET_NAME: nama set data
  • TABLE_NAME: nama tabel yang akan dibuat

Menjalankan pipeline

Jalankan pipeline streaming menggunakan template Pub/Sub Subscription to BigQuery yang disediakan Google. Pipeline ini mendapatkan data masuk dari topik Pub/Sub dan menghasilkan data ke set data BigQuery Anda.

Konsol

  1. Di konsol Google Cloud , buka halaman Jobs Dataflow.

    Buka Tugas

  2. Klik Buat tugas dari template.

  3. Masukkan Nama tugas untuk tugas Dataflow Anda.

  4. Untuk Regional endpoint, pilih region untuk tugas Dataflow Anda.

  5. Untuk Dataflow template, pilih template Pub/Sub Subscription to BigQuery.

  6. Untuk BigQuery output table, pilih Browse, lalu pilih tabel BigQuery Anda.

  7. Di daftar Pub/Sub input subscription, pilih langganan Pub/Sub.

  8. Untuk Lokasi sementara, masukkan informasi berikut:

    gs://BUCKET_NAME/temp/
    

    Ganti BUCKET_NAME dengan nama bucket Cloud Storage Anda. Folder temp menyimpan file sementara untuk tugas Dataflow.

  9. Klik Run job.

gcloud

Untuk menjalankan template di shell atau terminal, gunakan perintah gcloud dataflow jobs run.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
    --region DATAFLOW_REGION \
    --staging-location gs://BUCKET_NAME/temp \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME

Ganti variabel berikut:

  • JOB_NAME. nama untuk tugas
  • DATAFLOW_REGION: region untuk tugas
  • PROJECT_ID: nama Google Cloud project Anda
  • SUBSCRIPTION_ID: nama langganan Pub/Sub Anda
  • DATASET_NAME: nama set data BigQuery Anda
  • TABLE_NAME: nama tabel BigQuery Anda

Memublikasikan pesan ke Pub/Sub

Setelah tugas Dataflow dimulai, Anda dapat memublikasikan pesan ke Pub/Sub, dan pipeline akan menuliskannya ke BigQuery.

Konsol

  1. Di konsol Google Cloud , buka halaman Pub/Sub > Topics.

    Buka Topik

  2. Di daftar topik, klik nama topik Anda.

  3. Klik Pesan.

  4. Klik Publikasikan pesan.

  5. Untuk Number of messages, masukkan 10.

  6. Untuk Message body, masukkan {"name": "Alice", "customer_id": 1}.

  7. Klik Publikasikan.

gcloud

Untuk memublikasikan pesan ke topik Anda, gunakan perintah gcloud pubsub topics publish.

for run in {1..10}; do
  gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done

Ganti TOPIC_ID dengan nama topik Anda.

Melihat hasil Anda

Lihat data yang ditulis ke tabel BigQuery Anda. Mungkin perlu waktu hingga satu menit agar data mulai muncul di tabel Anda.

Konsol

  1. Di Google Cloud konsol, buka halaman BigQuery.
    Buka halaman BigQuery

  2. Di editor kueri, jalankan kueri berikut:

    SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`
    LIMIT 1000
    

    Ganti variabel berikut:

    • PROJECT_ID: nama project Google Cloud Anda
    • DATASET_NAME: nama set data BigQuery Anda
    • TABLE_NAME: nama tabel BigQuery Anda

gcloud

Periksa hasilnya di BigQuery dengan menjalankan kueri berikut:

bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'

Ganti variabel berikut:

  • PROJECT_ID: nama project Google Cloud Anda
  • DATASET_NAME: nama set data BigQuery Anda
  • TABLE_NAME: nama tabel BigQuery Anda

Menggunakan UDF untuk mengubah data

Tutorial ini mengasumsikan bahwa pesan Pub/Sub diformat sebagai JSON, dan skema tabel BigQuery cocok dengan data JSON.

Secara opsional, Anda dapat memberikan fungsi yang ditentukan pengguna (UDF) JavaScript yang mentransformasi data sebelum ditulis ke BigQuery. UDF dapat melakukan pemrosesan tambahan, seperti memfilter, menghapus informasi identitas pribadi (PII), atau memperkaya data dengan kolom tambahan.

Untuk mengetahui informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Menggunakan tabel pesan yang dihentikan pengirimannya

Saat tugas berjalan, pipeline mungkin gagal menulis setiap pesan ke BigQuery. Kemungkinan error meliputi:

  • Error serialisasi, termasuk JSON yang diformat dengan buruk.
  • Error konversi jenis, yang disebabkan oleh ketidakcocokan dalam skema tabel dan data JSON.
  • Kolom tambahan dalam data JSON yang tidak ada dalam skema tabel.

Pipeline menulis error ini ke tabel yang dihentikan pengirimannya di BigQuery. Secara default, pipeline otomatis membuat tabel pesan yang dihentikan pengirimannya bernama TABLE_NAME_error_records, dengan TABLE_NAME adalah nama tabel output. Untuk menggunakan nama yang berbeda, tetapkan parameter template outputDeadletterTable.

Pembersihan

Agar tidak perlu membayar biaya pada akun Google Cloud Anda untuk resource yang digunakan dalam tutorial ini, hapus project yang berisi resource tersebut, atau simpan project dan hapus setiap resource.

Menghapus project

Cara termudah untuk menghilangkan penagihan adalah dengan menghapus Google Cloud project yang Anda buat untuk tutorial.

Konsol

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

gcloud

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Menghapus resource satu per satu

Jika ingin menggunakan kembali project tersebut nanti, Anda dapat mempertahankan project tersebut, tetapi menghapus resource yang Anda buat selama tutorial.

Menghentikan pipeline Dataflow

Konsol

  1. Di konsol Google Cloud , buka halaman Jobs Dataflow.

    Buka Tugas

  2. Klik tugas yang ingin Anda hentikan.

    Untuk menghentikan tugas, status tugas harus berjalan.

  3. Di halaman detail tugas, klik Stop.

  4. Klik Cancel.

  5. Untuk mengonfirmasi pilihan Anda, klik Hentikan Tugas.

gcloud

Untuk membatalkan tugas Dataflow, gunakan perintah gcloud dataflow jobs.

gcloud dataflow jobs list \
  --filter 'NAME=JOB_NAME AND STATE=Running' \
  --format 'value(JOB_ID)' \
  --region "DATAFLOW_REGION" \
  | xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"

Membersihkan Google Cloud resource project

Konsol

  1. Hapus topik dan langganan Pub/Sub.

    1. Buka halaman Topics Pub/Sub di konsol Google Cloud .

      Buka Topik

    2. Pilih topik yang Anda buat.

    3. Klik Hapus untuk menghapus topik secara permanen.

    4. Buka halaman Subscriptions Pub/Sub di konsol Google Cloud .

      Buka Langganan

    5. Pilih langganan yang dibuat dengan topik Anda.

    6. Klik Hapus untuk menghapus langganan secara permanen.

  2. Hapus tabel dan set data BigQuery.

    1. Di Google Cloud konsol, buka halaman BigQuery.

      Buka BigQuery

    2. Di panel Explorer, luaskan project Anda.

    3. Di samping set data yang ingin Anda hapus, klik Lihat tindakan, lalu klik hapus.

  3. Hapus bucket Cloud Storage.

    1. Di Google Cloud konsol, buka halaman Bucket Cloud Storage.

      Buka Buckets

    2. Pilih bucket yang ingin Anda hapus, klik Hapus, lalu ikuti petunjuknya.

gcloud

  1. Untuk menghapus langganan dan topik Pub/Sub, gunakan perintah gcloud pubsub subscriptions delete dan gcloud pubsub topics delete.

    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  2. Untuk menghapus tabel BigQuery, gunakan perintah bq rm.

    bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
    
  3. Hapus set data BigQuery. Set data saja tidak dikenai biaya apa pun.

    bq rm -r -f -d PROJECT_ID:tutorial_dataset
    
  4. Untuk menghapus bucket Cloud Storage dan objeknya, gunakan perintah gcloud storage rm. Bucket saja tidak menimbulkan biaya apa pun.

    gcloud storage rm gs://BUCKET_NAME --recursive
    

Mencabut kredensial

Konsol

Jika Anda mempertahankan project, batalkan peran yang Anda berikan ke akun layanan default Compute Engine.

  1. Di konsol Google Cloud , buka halaman IAM.

Buka IAM

  1. Pilih project, folder, atau organisasi.

  2. Cari baris yang berisi akun utama yang aksesnya ingin Anda cabut. Di baris tersebut, klik Edit akun utama.

  3. Klik tombol Hapus untuk setiap peran yang ingin Anda cabut, lalu klik Simpan.

gcloud

  • Jika Anda mempertahankan project, batalkan peran yang Anda berikan ke akun layanan default Compute Engine. Jalankan perintah berikut satu kali untuk setiap peran IAM berikut:
    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.admin
    • roles/pubsub.editor
    • roles/bigquery.dataEditor
      gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \
      --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \
      --role=<var>ROLE</var>
    

  • Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  • Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

Langkah berikutnya