Memuat data dari Cloud Storage ke BigQuery menggunakan Alur Kerja

Last reviewed 2021-05-12 UTC

Tutorial ini menunjukkan cara menjalankan alur kerja tanpa server secara andal menggunakan Workflows, Cloud Run Functions, dan Firestore untuk memuat data mentah, seperti log peristiwa, dari Cloud Storage ke BigQuery. Platform analisis biasanya memiliki alat orkestrasi untuk memuat data secara berkala di BigQuery menggunakan tugas BigQuery, lalu mentransformasi data untuk memberikan metrik bisnis menggunakan pernyataan SQL, termasuk pernyataan bahasa prosedural BigQuery. Tutorial ini ditujukan bagi developer dan arsitek yang ingin membangun pipeline pemrosesan data berbasis peristiwa tanpa server. Tutorial ini mengasumsikan bahwa Anda sudah memahami YAML, SQL, dan Python.

Arsitektur

Diagram berikut menunjukkan arsitektur tingkat tinggi pipeline ekstraksi, pemuatan, dan transformasi (ELT) serverless menggunakan Workflows.

Pipeline ekstraksi, pemuatan, dan transformasi.

Dalam diagram sebelumnya, pertimbangkan platform retail yang secara berkala mengumpulkan peristiwa penjualan sebagai file dari berbagai toko, lalu menulis file tersebut ke bucket Cloud Storage. Peristiwa digunakan untuk memberikan metrik bisnis dengan mengimpor dan memproses di BigQuery. Arsitektur ini menyediakan sistem orkestrasi serverless yang andal untuk mengimpor file Anda ke BigQuery, dan dibagi menjadi dua modul berikut:

  • Daftar file: Memelihara daftar file yang belum diproses yang ditambahkan ke bucket Cloud Storage dalam koleksi Firestore. Modul ini berfungsi melalui Cloud Run Function yang dipicu oleh peristiwa penyimpanan Object Finalize, yang dihasilkan saat file baru ditambahkan ke bucket Cloud Storage. Nama file ditambahkan ke array files dari koleksi bernama new di Firestore.
  • Alur kerja: Menjalankan alur kerja terjadwal. Cloud Scheduler memicu alur kerja yang menjalankan serangkaian langkah sesuai dengan sintaksis berbasis YAML untuk mengatur pemuatan, lalu mengubah data di BigQuery dengan memanggil fungsi Cloud Run. Langkah-langkah dalam alur kerja memanggil Cloud Run Function untuk menjalankan tugas berikut:

    • Buat dan mulai tugas pemuatan BigQuery.
    • Polling status tugas pemuatan.
    • Buat dan mulai tugas kueri transformasi.
    • Melakukan polling status tugas transformasi.

Menggunakan transaksi untuk mempertahankan daftar file baru di Firestore membantu memastikan tidak ada file yang terlewat saat alur kerja mengimpornya ke BigQuery. Eksekusi alur kerja yang terpisah dibuat idempoten dengan menyimpan metadata dan status tugas di Firestore.

Tujuan

  • Buat database Firestore.
  • Siapkan pemicu fungsi Cloud Run untuk melacak file yang ditambahkan ke bucket Cloud Storage di Firestore.
  • Men-deploy fungsi Cloud Run untuk menjalankan dan memantau tugas BigQuery.
  • Deploy dan jalankan alur kerja untuk mengotomatiskan proses.

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Setelah menyelesaikan tugas yang dijelaskan dalam dokumen ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Untuk mengetahui informasi selengkapnya, lihat Pembersihan.

Sebelum memulai

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Cloud Build, Cloud Run functions, Identity and Access Management, Resource Manager, and Workflows APIs.

    Enable the APIs

  4. Buka halaman Welcome dan catat Project ID yang akan digunakan pada langkah berikutnya.

    Buka halaman Selamat datang

  5. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

Menyiapkan lingkungan Anda

Untuk menyiapkan lingkungan, buat database Firestore, clone contoh kode dari repositori GitHub, buat resource menggunakan Terraform, edit file YAML Workflows, dan instal persyaratan untuk generator file.

  1. Untuk membuat database Firestore, lakukan langkah-langkah berikut:

    1. Di konsol Google Cloud , buka halaman Firestore.

      Buka Firestore

    2. Klik Pilih mode native.

    3. Di menu Pilih lokasi, pilih region tempat Anda ingin menghosting database Firestore. Sebaiknya pilih region yang dekat dengan lokasi fisik Anda.

    4. Klik Buat database.

  2. Pada Cloud Shell, clone repositori sumber:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. Di Cloud Shell, buat resource berikut menggunakan Terraform:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    Ganti kode berikut:

    • PROJECT_ID: Google Cloud project ID Anda
    • REGION: lokasi geografis Google Cloud tertentu untuk menghosting resource Anda—misalnya, us-central1
    • ZONE: lokasi dalam region untuk menghosting resource Anda—misalnya, us-central1-b

    Anda akan melihat pesan yang mirip dengan berikut: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Terraform dapat membantu Anda membuat, mengubah, dan mengupgrade infrastruktur dalam skala besar dengan aman dan terprediksi. Resource berikut dibuat di project Anda:

    • Akun layanan dengan hak istimewa yang diperlukan untuk memastikan akses yang aman ke resource Anda.
    • Set data BigQuery bernama serverless_elt_dataset dan tabel bernama word_count untuk memuat file yang masuk.
    • Bucket Cloud Storage bernama ${project_id}-ordersbucket untuk melakukan staging file input.
    • Lima Cloud Run function berikut:
      • file_add_handler menambahkan nama file yang ditambahkan ke bucket Cloud Storage ke koleksi Firestore.
      • create_job membuat tugas pemuatan BigQuery baru dan mengaitkan file dalam koleksi Firebase dengan tugas tersebut.
      • create_query membuat tugas kueri BigQuery baru.
      • poll_bigquery_job mendapatkan status tugas BigQuery.
      • run_bigquery_job memulai tugas BigQuery.
  4. Dapatkan URL untuk fungsi Cloud Run create_job, create_query, poll_job, dan run_bigquery_job yang Anda deploy pada langkah sebelumnya.

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    Outputnya mirip dengan hal berikut ini:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    Catat URL ini karena diperlukan saat Anda men-deploy alur kerja.

Membuat dan men-deploy alur kerja

  1. Di Cloud Shell, buka file sumber untuk alur kerja, workflow.yaml:

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    Ganti kode berikut:

    • CREATE_JOB_URL: URL fungsi untuk membuat tugas baru
    • POLL_BIGQUERY_JOB_URL: URL fungsi untuk melakukan polling status tugas yang sedang berjalan
    • RUN_BIGQUERY_JOB_URL: URL fungsi untuk memulai tugas pemuatan BigQuery
    • CREATE_QUERY_URL: URL fungsi untuk memulai tugas kueri BigQuery
    • BQ_REGION: region BigQuery tempat data disimpan—misalnya, US
    • BQ_DATASET_TABLE_NAME: nama tabel set data BigQuery dalam format PROJECT_ID.serverless_elt_dataset.word_count
  2. Deploy file workflow:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID. \
        --source=workflow.yaml
    

    Ganti kode berikut:

    • WORKFLOW_NAME: nama unik alur kerja
    • WORKFLOW_REGION: region tempat alur kerja di-deploy—misalnya, us-central1
    • WORKFLOW_DESCRIPTION: deskripsi alur kerja
  3. Buat lingkungan virtual Python 3 dan instal persyaratan untuk generator file:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

Membuat file untuk diimpor

Skrip Python gen.py menghasilkan konten acak dalam format Avro. Skemanya sama dengan tabel word_count BigQuery. File Avro ini disalin ke bucket Cloud Storage yang ditentukan.

Di Cloud Shell, buat file:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

Ganti kode berikut:

  • RECORDS_PER_FILE: jumlah data dalam satu file
  • NUM_FILES: jumlah total file yang akan diupload
  • FILE_PREFIX: awalan untuk nama file yang dihasilkan

Melihat entri file di Firestore

Saat file disalin ke Cloud Storage, fungsi Cloud Run handle_new_file dipicu. Fungsi ini menambahkan daftar file ke array daftar file dalam dokumen new di koleksi jobs Firestore.

Untuk melihat daftar file, di konsol Google Cloud , buka halaman Data Firestore.

Buka Data

Daftar file yang ditambahkan ke koleksi.

Memicu alur kerja

Workflows menghubungkan serangkaian tugas serverless dari Google Cloud dan layanan API. Setiap langkah dalam alur kerja ini berjalan sebagai fungsi Cloud Run dan statusnya disimpan di Firestore. Semua panggilan ke fungsi Cloud Run diautentikasi menggunakan akun layanan alur kerja.

Di Cloud Shell, jalankan alur kerja:

gcloud workflows execute WORKFLOW_NAME

Diagram berikut menunjukkan langkah-langkah yang digunakan dalam alur kerja:

Langkah-langkah yang digunakan dalam alur kerja utama dan sub-alur kerja.

Alur kerja dibagi menjadi dua bagian: alur kerja utama dan sub-alur kerja. Alur kerja utama menangani pembuatan tugas dan eksekusi bersyarat, sedangkan sub-alur kerja menjalankan tugas BigQuery. Alur kerja ini melakukan operasi berikut:

  • Fungsi create_job Cloud Run membuat objek tugas baru, mendapatkan daftar file yang ditambahkan ke Cloud Storage dari dokumen Firestore, dan mengaitkan file dengan tugas pemuatan. Jika tidak ada file yang akan dimuat, fungsi tidak akan membuat tugas baru.
  • Fungsi Cloud Run create_query mengambil kueri yang perlu dieksekusi bersama dengan region BigQuery tempat kueri harus dieksekusi. Fungsi ini membuat tugas di Firestore dan menampilkan ID tugas.
  • Fungsi Cloud Run run_bigquery_job mendapatkan ID tugas yang perlu dijalankan, lalu memanggil BigQuery API untuk mengirimkan tugas.
  • Daripada menunggu hingga tugas selesai di fungsi Cloud Run, Anda dapat melakukan polling status tugas secara berkala.
    • Fungsi Cloud Run poll_bigquery_job memberikan status tugas. Callback ini dipanggil berulang kali hingga tugas selesai.
    • Untuk menambahkan penundaan antara panggilan ke fungsi Cloud Run poll_bigquery_job, rutinitas sleep dipanggil dari Workflows.

Melihat status tugas

Anda dapat melihat daftar file dan status tugas.

  1. Di konsolGoogle Cloud , buka halaman Data Firestore.

    Buka Data

  2. ID unik (UUID) dibuat untuk setiap tugas. Untuk melihat job_type dan status, klik ID tugas. Setiap tugas dapat memiliki salah satu jenis dan status berikut:

    • job_type: Jenis tugas yang dijalankan oleh alur kerja dengan salah satu nilai berikut:

      • 0: Muat data ke BigQuery.
      • 1: Jalankan kueri di BigQuery.
    • status: Status tugas saat ini dengan salah satu nilai berikut:

      • 0: Tugas telah dibuat, tetapi belum dimulai.
      • 1: Tugas sedang berjalan.
      • 2: Tugas berhasil menyelesaikan eksekusinya.
      • 3: Terjadi error dan tugas tidak berhasil diselesaikan.

    Objek tugas juga berisi atribut metadata seperti region set data BigQuery, nama tabel BigQuery, dan jika merupakan tugas kueri, string kueri yang sedang dijalankan.

Daftar file dengan status tugas yang ditandai.

Melihat data di BigQuery

Untuk mengonfirmasi bahwa tugas ELT berhasil, pastikan data muncul di tabel.

  1. Di Google Cloud konsol, buka halaman Editor BigQuery.

    Buka Editor

  2. Klik tabel serverless_elt_dataset.word_count.

  3. Klik tab Pratinjau.

    Tab Preview yang menampilkan data dalam tabel.

Menjadwalkan alur kerja

Untuk menjalankan alur kerja secara berkala sesuai jadwal, Anda dapat menggunakan Cloud Scheduler.

Pembersihan

Cara termudah untuk menghilangkan penagihan adalah dengan menghapus Google Cloud project yang Anda buat untuk tutorial. Atau, Anda dapat menghapus resource satu per satu.

Menghapus resource satu per satu

  1. Di Cloud Shell, hapus semua resource yang dibuat menggunakan Terraform:

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. Di konsol Google Cloud , buka halaman Data Firestore.

    Buka Data

  3. Di samping Jobs, klik Menu, lalu pilih Delete.

    Jalur menu untuk menghapus koleksi.

Menghapus project

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Langkah berikutnya