Template Datastream ke BigQuery (Streaming)

Template Datastream ke BigQuery adalah pipeline streaming yang membaca data Datastream dan mereplikasinya ke BigQuery. Template ini membaca data dari Cloud Storage menggunakan notifikasi Pub/Sub dan mereplikasinya ke dalam tabel penyiapan BigQuery yang dipartisi menurut waktu. Setelah replikasi, template akan menjalankan MERGE di BigQuery untuk meng-upsert semua perubahan change data capture (CDC) ke dalam replika tabel sumber. Tentukan parameter gcsPubSubSubscription untuk membaca data dari notifikasi Pub/Sub, ATAU berikan parameter inputFilePattern untuk membaca data langsung dari file di Cloud Storage.

Template menangani pembuatan dan pembaruan tabel BigQuery yang dikelola oleh replikasi. Jika bahasa definisi data (DDL) diperlukan, callback ke Datastream akan mengekstrak skema tabel sumber dan menerjemahkannya ke jenis data BigQuery. Operasi yang didukung meliputi:

  • Tabel baru dibuat saat data dimasukkan.
  • Kolom baru ditambahkan ke tabel BigQuery dengan nilai awal null.
  • Kolom yang dihapus diabaikan di BigQuery dan nilai mendatang adalah null.
  • Kolom yang diganti namanya ditambahkan ke BigQuery sebagai kolom baru.
  • Perubahan jenis tidak diteruskan ke BigQuery.

Sebaiknya jalankan pipeline ini menggunakan mode streaming minimal sekali, karena template melakukan penghapusan duplikat saat menggabungkan data dari tabel BigQuery sementara ke tabel BigQuery utama. Langkah ini dalam pipeline berarti tidak ada manfaat tambahan untuk menggunakan mode streaming tepat sekali.

Persyaratan pipeline

  • Aliran Datastream yang siap atau sudah mereplikasi data.
  • Notifikasi Pub/Sub Cloud Storage diaktifkan untuk data Datastream.
  • Set data tujuan BigQuery dibuat dan Akun Layanan Compute Engine telah diberi akses administrator ke set data tersebut.
  • Primary key diperlukan di tabel sumber agar tabel replika tujuan dapat dibuat.
  • Database sumber MySQL atau Oracle. Database PostgreSQL dan SQL Server tidak didukung.

Parameter template

Parameter yang diperlukan

  • inputFilePattern: Lokasi file untuk output file Datastream di Cloud Storage, dalam format gs://<BUCKET_NAME>/<ROOT_PATH>/.
  • inputFileFormat: Format file output yang dihasilkan oleh Datastream. Nilai yang diizinkan adalah avro dan json. Setelan defaultnya adalah avro.
  • gcsPubSubSubscription: Langganan Pub/Sub yang digunakan oleh Cloud Storage untuk memberi tahu Dataflow tentang file baru yang tersedia untuk diproses, dalam format: projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • outputStagingDatasetTemplate: Nama set data yang berisi tabel penahapan. Parameter ini mendukung template, misalnya {_metadata_dataset}_log atau my_dataset_log. Biasanya, parameter ini adalah nama set data. Setelan defaultnya adalah {_metadata_dataset}. Catatan: Untuk sumber MySQL, nama database dipetakan ke {_metadata_schema}, bukan {_metadata_dataset}.
  • outputDatasetTemplate: Nama set data yang berisi tabel replika. Parameter ini mendukung template, misalnya {_metadata_dataset} atau my_dataset. Biasanya, parameter ini adalah nama set data. Setelan defaultnya adalah {_metadata_dataset}. Catatan: Untuk sumber MySQL, nama database dipetakan ke {_metadata_schema}, bukan {_metadata_dataset}.
  • deadLetterQueueDirectory: Jalur yang digunakan Dataflow untuk menulis output antrean pesan yang tidak terkirim. Jalur ini tidak boleh berada di jalur yang sama dengan output file Datastream. Nilai defaultnya adalah empty.

Parameter opsional

  • streamName: Nama atau template untuk stream yang akan di-polling untuk mendapatkan informasi skema. Nilai defaultnya adalah: {_metadata_stream}. Nilai default biasanya sudah cukup.
  • rfcStartDateTime: DateTime awal yang akan digunakan untuk mengambil data dari Cloud Storage (https://tools.ietf.org/html/rfc3339). Default: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: Jumlah file DataStream serentak yang akan dibaca. Default-nya adalah 10.
  • outputProjectId: ID project Google Cloud yang berisi set data BigQuery untuk menghasilkan data. Nilai default untuk parameter ini adalah project tempat pipeline Dataflow berjalan.
  • outputStagingTableNameTemplate: Template yang akan digunakan untuk memberi nama tabel penahapan. Contoh, {_metadata_table}. Setelan defaultnya adalah {_metadata_table}_log.
  • outputTableNameTemplate: Template yang akan digunakan untuk nama tabel replika, misalnya {_metadata_table}. Setelan defaultnya adalah {_metadata_table}.
  • ignoreFields: Kolom yang dipisahkan koma untuk diabaikan di BigQuery. Default: _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count. Misalnya, _metadata_stream,_metadata_schema.
  • mergeFrequencyMinutes: Jumlah menit antara penggabungan untuk tabel tertentu. Nilai defaultnya adalah 5.
  • dlqRetryMinutes: Jumlah menit antara Coba Ulang DLQ. Nilai defaultnya adalah 10.
  • dataStreamRootUrl: URL root Datastream API. Default ke: https://datastream.googleapis.com/.
  • applyMerge: Apakah akan menonaktifkan kueri MERGE untuk tugas. Nilai defaultnya adalah true.
  • mergeConcurrency: Jumlah kueri MERGE BigQuery serentak. Hanya efektif jika applyMerge disetel ke benar (true). Nilai defaultnya adalah 30.
  • partitionRetentionDays: Jumlah hari yang akan digunakan untuk retensi partisi saat menjalankan penggabungan BigQuery. Nilai defaultnya adalah 1.
  • useStorageWriteApiAtLeastOnce: Parameter ini hanya berlaku jika Use BigQuery Storage Write API diaktifkan. Jika true, semantik minimal sekali digunakan untuk Storage Write API. Jika tidak, semantik tepat satu kali akan digunakan. Setelan defaultnya adalah false.
  • javascriptTextTransformGcsPath: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Contoh, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, maka nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Menentukan seberapa sering UDF dimuat ulang, dalam hitungan menit. Jika nilainya lebih besar dari 0, Dataflow akan memeriksa file UDF di Cloud Storage secara berkala, dan memuat ulang UDF jika file dimodifikasi. Parameter ini memungkinkan Anda mengupdate UDF saat pipeline sedang berjalan, tanpa perlu memulai ulang tugas. Jika nilainya adalah 0, pemuatan ulang UDF akan dinonaktifkan. Nilai defaultnya adalah 0.
  • pythonTextTransformGcsPath: Pola jalur Cloud Storage untuk kode Python yang berisi fungsi yang ditentukan pengguna. Contoh, gs://your-bucket/your-transforms/*.py.
  • pythonRuntimeVersion: Versi runtime yang akan digunakan untuk UDF Python ini.
  • pythonTextTransformFunctionName: Nama fungsi yang akan dipanggil dari file JavaScript Anda. Hanya gunakan huruf, angka, dan garis bawah. Contoh, transform_udf1.
  • runtimeRetries: Jumlah percobaan ulang runtime sebelum gagal. Nilai defaultnya adalah: 5.
  • useStorageWriteApi: Jika benar (true), pipeline akan menggunakan BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Nilai defaultnya adalah false. Untuk mengetahui informasi selengkapnya, lihat Menggunakan Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: Saat menggunakan Storage Write API, tentukan jumlah aliran penulisan. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini. Nilai default: 0.
  • storageWriteApiTriggeringFrequencySec: Saat menggunakan Storage Write API, menentukan frekuensi pemicuan, dalam detik. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.

Fungsi yang ditentukan pengguna

Secara opsional, Anda dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template memanggil UDF untuk setiap elemen input. Payload elemen diserialisasi sebagai string JSON. Untuk mengetahui informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: data CDC, diserialisasi sebagai string JSON.
  • Output: string JSON yang cocok dengan skema tabel tujuan BigQuery.
  • Menjalankan template

    Konsol

    1. Buka halaman Dataflow Create job from template.
    2. Buka Membuat tugas dari template
    3. Di kolom Nama tugas, masukkan nama tugas yang unik.
    4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

      Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

    5. Dari menu drop-down Template Dataflow, pilih the Datastream to BigQuery template.
    6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
    7. Opsional: Untuk beralih dari pemrosesan tepat satu kali ke mode streaming minimal satu kali, pilih Minimal Satu Kali.
    8. Klik Run job.

    gcloud

    Di shell atau terminal Anda, jalankan template:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Ganti kode berikut:

    • PROJECT_ID: ID Google Cloud project tempat Anda ingin menjalankan tugas Dataflow
    • JOB_NAME: nama tugas unik pilihan Anda
    • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: jalur Cloud Storage ke data Datastream. Contoh: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: langganan Pub/Sub untuk membaca file yang diubah. Contoh: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: nama set data BigQuery Anda.
    • BIGQUERY_TABLE: template tabel BigQuery Anda. Misalnya, {_metadata_schema}_{_metadata_table}_log

    API

    Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Ganti kode berikut:

    • PROJECT_ID: ID Google Cloud project tempat Anda ingin menjalankan tugas Dataflow
    • JOB_NAME: nama tugas unik pilihan Anda
    • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: jalur Cloud Storage ke data Datastream. Contoh: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: langganan Pub/Sub untuk membaca file yang diubah. Contoh: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: nama set data BigQuery Anda.
    • BIGQUERY_TABLE: template tabel BigQuery Anda. Misalnya, {_metadata_schema}_{_metadata_table}_log

    Langkah berikutnya