Template Pub/Sub ke MongoDB

Template Pub/Sub to MongoDB adalah pipeline streaming yang membaca pesan berenkode JSON dari langganan Pub/Sub dan menuliskannya ke MongoDB sebagai dokumen. Jika diperlukan, pipeline ini mendukung transformasi tambahan yang dapat disertakan menggunakan fungsi yang ditentukan pengguna (UDF) JavaScript.

Jika terjadi error saat memproses kumpulan data, template akan menuliskannya ke tabel BigQuery, beserta pesan input. Misalnya, error dapat terjadi karena ketidakcocokan skema, JSON yang salah format, atau saat menjalankan transformasi. Tentukan nama tabel di parameter deadletterTable. Jika tabel tidak ada, pipeline akan otomatis membuatnya.

Persyaratan pipeline

  • Langganan Pub/Sub harus ada dan pesan harus dienkode dalam format JSON yang valid.
  • Cluster MongoDB harus ada dan harus dapat diakses dari mesin pekerja Dataflow.

Parameter template

Parameter yang diperlukan

  • inputSubscription: Nama langganan Pub/Sub. Contoh, projects/your-project-id/subscriptions/your-subscription-name.
  • mongoDBUri: Daftar server MongoDB yang dipisahkan koma. Contoh, host1:port,host2:port,host3:port.
  • database: Database di MongoDB untuk menyimpan koleksi. Contoh, my-db.
  • collection: Nama koleksi dalam database MongoDB. Contoh, my-collection.
  • deadletterTable: Tabel BigQuery yang menyimpan pesan yang disebabkan oleh kegagalan, seperti skema yang tidak cocok, JSON yang salah format, dan sebagainya. Contoh, your-project-id:your-dataset.your-table-name.

Parameter opsional

  • batchSize: Ukuran batch yang digunakan untuk penyisipan batch dokumen ke dalam MongoDB. Default-nya adalah: 1000.
  • batchSizeBytes: Ukuran batch dalam byte. Nilai defaultnya adalah: 5242880.
  • maxConnectionIdleTime: Waktu tidak ada aktivitas maksimum yang diizinkan dalam detik sebelum waktu tunggu koneksi habis. Default: 60000.
  • sslEnabled: Nilai boolean yang menunjukkan apakah koneksi ke MongoDB diaktifkan untuk SSL. Nilai defaultnya adalah: true.
  • ignoreSSLCertificate: Nilai boolean yang menunjukkan apakah sertifikat SSL harus diabaikan. Nilai defaultnya adalah: true.
  • withOrdered: Nilai Boolean yang memungkinkan penyisipan massal yang diurutkan ke dalam MongoDB. Nilai defaultnya adalah: true.
  • withSSLInvalidHostNameAllowed: Nilai boolean yang menunjukkan apakah nama host yang tidak valid diizinkan untuk koneksi SSL. Nilai defaultnya adalah: true.
  • javascriptTextTransformGcsPath: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Contoh, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, maka nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Menentukan seberapa sering UDF dimuat ulang, dalam hitungan menit. Jika nilainya lebih besar dari 0, Dataflow akan memeriksa file UDF di Cloud Storage secara berkala, dan memuat ulang UDF jika file dimodifikasi. Parameter ini memungkinkan Anda mengupdate UDF saat pipeline sedang berjalan, tanpa perlu memulai ulang tugas. Jika nilainya adalah 0, pemuatan ulang UDF akan dinonaktifkan. Nilai defaultnya adalah 0.

Fungsi yang ditentukan pengguna

Secara opsional, Anda dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template memanggil UDF untuk setiap elemen input. Payload elemen diserialisasi sebagai string JSON. Untuk mengetahui informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: satu baris dari file CSV input.
  • Output: dokumen JSON yang diubah menjadi string untuk disisipkan ke MongoDB.

Menjalankan template

Konsol

  1. Buka halaman Dataflow Create job from template.
  2. Buka Membuat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region default-nya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Template Dataflow, pilih the Pub/Sub to MongoDB template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal Anda, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Ganti kode berikut:

  • PROJECT_ID: ID Google Cloud project tempat Anda ingin menjalankan tugas Dataflow
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • INPUT_SUBSCRIPTION: langganan Pub/Sub (misalnya, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: alamat server MongoDB (misalnya, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: nama database MongoDB (misalnya, users)
  • COLLECTION: nama koleksi MongoDB (misalnya, profiles)
  • UNPROCESSED_TABLE: nama tabel BigQuery (misalnya, your-project:your-dataset.your-table-name)

API

Untuk menjalankan template menggunakan REST API, kirim permintaan HTTP POST. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID Google Cloud project tempat Anda ingin menjalankan tugas Dataflow
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • INPUT_SUBSCRIPTION: langganan Pub/Sub (misalnya, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: alamat server MongoDB (misalnya, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: nama database MongoDB (misalnya, users)
  • COLLECTION: nama koleksi MongoDB (misalnya, profiles)
  • UNPROCESSED_TABLE: nama tabel BigQuery (misalnya, your-project:your-dataset.your-table-name)

Langkah berikutnya