Template Streaming Data Generator ke Pub/Sub, BigQuery, dan Cloud Storage

Template Streaming Data Generator digunakan untuk menghasilkan jumlah catatan atau pesan sintetis yang tidak terbatas atau tetap berdasarkan skema yang disediakan pengguna pada kecepatan yang ditentukan. Tujuan yang kompatibel mencakup topik Pub/Sub, tabel BigQuery, dan bucket Cloud Storage.

Berikut adalah beberapa kemungkinan kasus penggunaan:

  • Simulasikan publikasi peristiwa real-time skala besar ke topik Pub/Sub untuk mengukur dan menentukan jumlah serta ukuran konsumen yang diperlukan untuk memproses peristiwa yang dipublikasikan.
  • Buat data sintetis ke tabel BigQuery atau bucket Cloud Storage untuk mengevaluasi tolok ukur performa atau berfungsi sebagai bukti konsep.

Format encoding dan sink yang didukung

Tabel berikut menjelaskan format encoding dan tujuan yang didukung oleh template ini:
JSON Avro Parquet
Pub/Sub Ya Ya Tidak
BigQuery Ya Tidak Tidak
Cloud Storage Ya Ya Ya

Persyaratan pipeline

  • Akun layanan worker memerlukan peran yang ditetapkan sebagai Dataflow Worker (roles/dataflow.worker). Untuk informasi selengkapnya, lihat Pengantar IAM.
  • Buat file skema yang berisi template JSON untuk data yang dihasilkan. Template ini menggunakan library JSON Data Generator, sehingga Anda dapat menyediakan berbagai fungsi faker untuk setiap kolom dalam skema. Untuk mengetahui informasi selengkapnya, lihat dokumentasi json-data-generator.

    Contoh:

    {
      "id": {{integer(0,1000)}},
      "name": "{{uuid()}}",
      "isInStock": {{bool()}}
    }
    
  • Upload file skema ke bucket Cloud Storage.
  • Target output harus ada sebelum eksekusi. Target harus berupa topik Pub/Sub, tabel BigQuery, atau bucket Cloud Storage, bergantung pada jenis sink.
  • Jika encoding output adalah Avro atau Parquet, buat file skema Avro dan simpan di lokasi Cloud Storage.
  • Tetapkan peran IAM tambahan untuk akun layanan pekerja, bergantung pada tujuan yang diinginkan.
    Tujuan Peran IAM yang juga diperlukan Terapkan ke resource mana
    Pub/Sub Pub/Sub Publisher (roles/pubsub.publisher)
    (Untuk mengetahui informasi selengkapnya, lihat Kontrol akses Pub/Sub dengan IAM)
    Topik Pub/Sub
    BigQuery BigQuery Data Editor (roles/bigquery.dataEditor)
    (Untuk mengetahui informasi selengkapnya, lihat Kontrol akses BigQuery dengan IAM)
    Set data BigQuery
    Cloud Storage Admin Objek Cloud Storage (roles/storage.objectAdmin)
    (Untuk mengetahui informasi selengkapnya, lihat Kontrol akses Cloud Storage dengan IAM)
    Bucket Cloud Storage

Parameter template

Parameter Deskripsi
schemaLocation Lokasi file skema. Misalnya: gs://mybucket/filename.json.
qps Jumlah pesan yang akan dipublikasikan per detik. Misalnya: 100.
sinkType (Opsional) Jenis tujuan output. Nilai yang mungkin adalah PUBSUB, BIGQUERY, GCS. Defaultnya adalah PUBSUB.
outputType (Opsional) Jenis encoding output. Nilai yang mungkin adalah JSON, AVRO, PARQUET. Default-nya adalah JSON.
avroSchemaLocation (Opsional) Lokasi file Skema AVRO. Wajib jika outputType adalah AVRO atau PARQUET. Contoh: gs://mybucket/filename.avsc.
topic (Opsional) Nama topik Pub/Sub tempat pipeline harus memublikasikan data. Wajib ada jika sinkType adalah Pub/Sub. Contoh: projects/my-project-id/topics/my-topic-id.
outputTableSpec (Opsional) Nama tabel BigQuery output. Wajib diisi jika sinkType adalah BigQuery. Contoh: my-project-ID:my_dataset_name.my-table-name.
writeDisposition (Opsional) Disposisi Penulisan BigQuery. Kemungkinan nilainya adalah WRITE_APPEND, WRITE_EMPTY, atau WRITE_TRUNCATE. Default-nya adalah WRITE_APPEND.
outputDeadletterTable (Opsional) Nama tabel BigQuery output untuk menyimpan rekaman yang gagal. Jika tidak disediakan, pipeline akan membuat tabel selama eksekusi dengan nama {output_table_name}_error_records. Misalnya: my-project-ID:my_dataset_name.my-table-name.
outputDirectory (Opsional) Jalur lokasi Cloud Storage output. Wajib diisi jika sinkType adalah Cloud Storage. Contoh: gs://mybucket/pathprefix/.
outputFilenamePrefix (Opsional) Awalan nama file dari file output yang ditulis ke Cloud Storage. Default-nya adalah output-.
windowDuration (Opsional) Interval jendela saat output ditulis ke Cloud Storage. Defaultnya adalah 1m (dengan kata lain, 1 menit).
numShards (Opsional) Jumlah maksimum shard output. Wajib diisi jika sinkType adalah Cloud Storage dan harus disetel ke 1 atau angka yang lebih tinggi.
messagesLimit (Opsional) Jumlah maksimum pesan output. Defaultnya adalah 0 yang menunjukkan tidak terbatas.
autoscalingAlgorithm (Opsional) Algoritma yang digunakan untuk penskalaan otomatis pekerja. Nilai yang mungkin adalah THROUGHPUT_BASED untuk mengaktifkan penskalaan otomatis atau NONE untuk menonaktifkan.
maxNumWorkers (Opsional) Jumlah maksimum mesin pekerja. Misalnya: 10.

Menjalankan template

Konsol

  1. Buka halaman Dataflow Create job from template.
  2. Buka Membuat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Template Dataflow, pilih the Streaming Data Generator template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Streaming_Data_Generator \
    --parameters \
schemaLocation=SCHEMA_LOCATION,\
qps=QPS,\
topic=PUBSUB_TOPIC
  

Ganti kode berikut:

  • PROJECT_ID: ID Google Cloud project tempat Anda ingin menjalankan tugas Dataflow
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • SCHEMA_LOCATION: jalur ke file skema di Cloud Storage. Contoh: gs://mybucket/filename.json.
  • QPS: jumlah pesan yang akan dipublikasikan per detik
  • PUBSUB_TOPIC: topik Pub/Sub output. Contoh: projects/my-project-id/topics/my-topic-id.

API

Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "schemaLocation": "SCHEMA_LOCATION",
          "qps": "QPS",
          "topic": "PUBSUB_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Streaming_Data_Generator",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID Google Cloud project tempat Anda ingin menjalankan tugas Dataflow
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • SCHEMA_LOCATION: jalur ke file skema di Cloud Storage. Contoh: gs://mybucket/filename.json.
  • QPS: jumlah pesan yang akan dipublikasikan per detik
  • PUBSUB_TOPIC: topik Pub/Sub output. Contoh: projects/my-project-id/topics/my-topic-id.

Langkah berikutnya