Melakukan streaming pesan Pub/Sub melalui WebSockets


Tutorial ini mengilustrasikan cara aplikasi frontend—dalam hal ini, halaman web—menangani data masuk bervolume tinggi saat Anda menggunakan Google Cloud. Tutorial ini menjelaskan beberapa tantangan streaming bervolume tinggi. Aplikasi contoh disediakan bersama tutorial ini yang mengilustrasikan cara menggunakan WebSockets untuk memvisualisasikan aliran pesan padat yang dipublikasikan ke topik Pub/Sub, memprosesnya secara tepat waktu yang mempertahankan frontend berperforma tinggi.

Tutorial ini ditujukan bagi developer yang sudah memahami komunikasi browser-ke-server melalui HTTP dan cara menulis aplikasi frontend menggunakan HTML, CSS, dan JavaScript. Tutorial ini mengasumsikan bahwa Anda memiliki beberapa pengalaman dengan Google Cloud dan sudah memahami alat command line Linux.

Tujuan

  • Buat dan konfigurasi instance virtual machine (VM) dengan komponen yang diperlukan untuk melakukan streaming payload langganan Pub/Sub ke klien browser.
  • Konfigurasi proses di VM untuk berlangganan topik Pub/Sub dan membuat output setiap pesan ke log.
  • Instal server web untuk menayangkan konten statis dan mengalirkan output perintah shell ke klien WebSocket.
  • Visualisasikan agregasi streaming WebSocket dan sampel pesan individual di browser menggunakan HTML, CSS, dan JavaScript.

Biaya

Dalam dokumen ini, Anda akan menggunakan komponen Google Cloudyang dapat ditagih berikut:

Untuk membuat perkiraan biaya berdasarkan proyeksi penggunaan Anda, gunakan kalkulator harga.

Pengguna Google Cloud baru mungkin memenuhi syarat untuk mendapatkan uji coba gratis.

Sebelum memulai

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Buka Cloud Shell untuk menjalankan perintah yang tercantum dalam tutorial ini.

    BUKA Cloud Shell

    Anda menjalankan semua perintah terminal dalam tutorial ini dari Cloud Shell.

  7. Aktifkan Compute Engine API dan Pub/Sub API:
    gcloud services enable compute pubsub
  8. Setelah menyelesaikan tutorial ini, Anda dapat menghindari penagihan berkelanjutan dengan menghapus resource yang Anda buat. Lihat Pembersihan untuk mengetahui detail selengkapnya.

Pengantar

Seiring makin banyaknya aplikasi yang mengadopsi model berbasis peristiwa, penting bagi aplikasi frontend untuk dapat membuat koneksi yang sederhana dan lancar ke layanan pesan yang menjadi fondasi arsitektur ini.

Ada beberapa opsi untuk melakukan streaming data ke klien browser web; yang paling umum adalah WebSockets. Tutorial ini memandu Anda menginstal proses yang berlangganan aliran pesan yang dipublikasikan ke topik Pub/Sub, dan merutekan pesan tersebut melalui server web dalam perjalanan ke klien yang terhubung melalui WebSockets.

Untuk tutorial ini, Anda akan menggunakan topik Pub/Sub yang tersedia secara publik dan digunakan dalam NYC Taxi Tycoon Google Dataflow CodeLab. Topik ini memberi Anda aliran real-time telemetri taksi simulasi berdasarkan data perjalanan historis yang diambil di New York City dari set data catatan perjalanan Taxi & Limousine Commission.

Arsitektur

Diagram berikut menunjukkan arsitektur tutorial yang Anda buat dalam tutorial ini.

Arsitektur tutorial

Diagram menunjukkan penerbit pesan yang berada di luar project yang berisi resource Compute Engine; penerbit mengirim pesan ke topik Pub/Sub. Instance Compute Engine membuat pesan tersedia melalui WebSockets untuk browser yang menjalankan dasbor berdasarkan HTML5 dan JavaScript.

Tutorial ini menggunakan kombinasi alat untuk menghubungkan Pub/Sub dan Websocket:

  • pulltop adalah program Node.js yang Anda instal sebagai bagian dari tutorial ini. Alat ini berlangganan topik Pub/Sub dan melakukan streaming pesan yang diterima ke output standar.
  • websocketd adalah alat command line kecil yang membungkus program antarmuka command line yang ada dan memungkinkan program tersebut diakses menggunakan WebSocket.

Dengan menggabungkan pulltop dan websocketd, Anda dapat melakukan streaming pesan yang diterima dari topik Pub/Sub ke browser menggunakan WebSockets.

Menyesuaikan throughput topik Pub/Sub

Topik Pub/Sub publik NYC Taxi Tycoon menghasilkan 2.000 hingga 2.500 update perjalanan taksi simulasi per detik—hingga 8 Mb atau lebih per detik. Kontrol alur bawaan di Pub/Sub memperlambat laju pesan pelanggan secara otomatis jika Pub/Sub mendeteksi antrean pesan yang tidak dikonfirmasi yang terus bertambah. Oleh karena itu, Anda mungkin melihat variabilitas kecepatan pesan yang tinggi di berbagai workstation, koneksi jaringan, dan kode pemrosesan front-end.

Pemrosesan pesan browser yang efektif

Mengingat volume pesan yang tinggi yang masuk melalui streaming WebSocket, Anda harus berhati-hati dalam menulis kode frontend yang memproses streaming ini. Misalnya, Anda dapat membuat elemen HTML secara dinamis untuk setiap pesan. Namun, pada kecepatan pesan yang diharapkan, memperbarui halaman untuk setiap pesan dapat mengunci jendela browser. Alokasi memori yang sering terjadi akibat pembuatan elemen HTML secara dinamis juga memperpanjang durasi pembersihan sampah memori, sehingga menurunkan kualitas pengalaman pengguna. Singkatnya, Anda tidak ingin memanggil document.createElement() untuk setiap sekitar 2.000 pesan yang tiba setiap detik.

Pendekatan yang dilakukan oleh tutorial ini untuk mengelola aliran pesan yang padat ini adalah sebagai berikut:

  • Menghitung dan terus memperbarui serangkaian metrik streaming secara real time, menampilkan sebagian besar informasi tentang pesan yang diamati sebagai nilai gabungan.
  • Gunakan dasbor berbasis browser untuk memvisualisasikan sampel kecil pesan individual pada jadwal yang telah ditentukan sebelumnya, yang hanya menampilkan peristiwa pembatalan dan pengambilan secara real time.

Gambar berikut menunjukkan dasbor yang dibuat sebagai bagian dari tutorial ini.

Dasbor yang dibuat di halaman web oleh kode dalam tutorial ini

Gambar tersebut menggambarkan latensi pesan terakhir sebesar 24 milidetik pada kecepatan hampir 2.100 pesan per detik. Jika jalur kode penting untuk memproses setiap pesan individual tidak selesai tepat waktu, jumlah pesan yang diamati per detik akan berkurang seiring peningkatan latensi pesan terakhir. Pengambilan sampel perjalanan dilakukan menggunakan JavaScript setInterval API yang disetel untuk berputar sekali setiap tiga detik, yang mencegah frontend membuat sejumlah besar elemen DOM selama masa pakainya. (Sebagian besar praktis tidak dapat diamati pada kecepatan lebih tinggi dari 10 per detik.)

Dasbor mulai memproses peristiwa di tengah aliran, sehingga perjalanan yang sudah berlangsung dikenali sebagai perjalanan baru oleh dasbor, kecuali jika perjalanan tersebut telah dilihat sebelumnya. Kode menggunakan array asosiatif untuk menyimpan setiap perjalanan yang diamati, yang diindeks berdasarkan nilai ride_id, dan menghapus referensi ke perjalanan tertentu saat penumpang telah diantar. Perjalanan dalam status "dalam perjalanan" atau "penjemputan" menambahkan referensi ke array tersebut kecuali (untuk kasus "dalam perjalanan") perjalanan telah diamati sebelumnya.

Menginstal dan mengonfigurasi server WebSocket

Untuk memulai, Anda membuat instance Compute Engine yang akan digunakan sebagai server WebSocket. Setelah membuat instance, Anda menginstal alat di instance tersebut yang akan Anda butuhkan nanti.

  1. Di Cloud Shell, tetapkan zona Compute Engine default. Contoh berikut menunjukkan us-central1-a, tetapi Anda dapat menggunakan zona apa pun yang Anda inginkan.

    gcloud config set compute/zone us-central1-a
    
  2. Buat instance Compute Engine bernama websocket-server di zona default:

    gcloud compute instances create websocket-server --tags wss
    
  3. Tambahkan aturan firewall yang mengizinkan traffic TCP di port 8000 ke instance mana pun yang diberi tag wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Jika Anda menggunakan project yang sudah ada, pastikan port TCP 22 terbuka untuk mengizinkan konektivitas SSH ke instance.

    Secara default, aturan firewall default-allow-ssh diaktifkan di jaringan default. Namun, jika Anda atau administrator Anda menghapus aturan default dalam project yang sudah ada, port TCP 22 mungkin tidak terbuka. (Jika Anda membuat project baru untuk tutorial ini, aturan akan diaktifkan secara default, dan Anda tidak perlu melakukan apa pun.)

    Tambahkan aturan firewall yang mengizinkan traffic TCP di port 22 ke instance mana pun yang diberi tag wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Hubungkan ke instance menggunakan SSH:

    gcloud compute ssh websocket-server
    
  6. Pada perintah terminal instance, beralihlah akun ke root agar Anda dapat menginstal software:

    sudo -s
    
  7. Instal alat git dan unzip:

    apt-get install -y unzip git
    
  8. Instal biner websocketd di instance:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Menginstal Node.js dan kode tutorial

  1. Di terminal pada instance, instal Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Download repositori sumber tutorial:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Ubah izin pada pulltop untuk mengizinkan eksekusi:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Instal dependensi pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Menguji apakah pulltop dapat membaca pesan

  1. Di instance, jalankan pulltop terhadap topik publik:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Jika pulltop berfungsi, Anda akan melihat aliran hasil seperti berikut:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Tekan Ctrl+C untuk menghentikan streaming.

Membangun aliran pesan ke websocketd

Setelah memastikan bahwa pulltop dapat membaca topik Pub/Sub, Anda dapat memulai proses websocketd untuk mulai mengirim pesan ke browser.

Merekam pesan topik ke file lokal

Untuk tutorial ini, Anda akan merekam aliran pesan yang Anda dapatkan dari pulltop dan menuliskannya ke file lokal. Merekam traffic pesan ke file lokal akan menambah persyaratan penyimpanan, tetapi juga memisahkan operasi proses websocketd dari pesan topik Pub/Sub streaming. Merekam informasi secara lokal memungkinkan skenario saat Anda mungkin ingin menghentikan sementara streaming Pub/Sub (mungkin untuk menyesuaikan parameter kontrol alur) tetapi tidak memaksa reset klien WebSocket yang saat ini terhubung. Saat aliran pesan dibuat ulang, websocketd akan otomatis melanjutkan streaming pesan ke klien.

  1. Di instance, jalankan pulltop terhadap topik publik, dan alihkan output pesan ke file taxi.json lokal. Perintah nohup menginstruksikan OS untuk menjalankan proses pulltop jika Anda logout atau menutup terminal.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Verifikasi bahwa pesan JSON sedang ditulis ke file:

    tail /var/tmp/taxi.json
    

    Jika pesan sedang ditulis ke file taxi.json, output-nya akan mirip dengan yang berikut ini:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Ubah ke folder web aplikasi Anda:

    cd ../web
    
  4. Mulai websocketd untuk mulai melakukan streaming konten file lokal menggunakan WebSockets:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    Tindakan ini akan menjalankan perintah websocketd di latar belakang. Alat websocketd menggunakan output perintah tail dan mengalirkan setiap elemen sebagai pesan WebSocket.

  5. Periksa konten nohup.out untuk memverifikasi bahwa server telah dimulai dengan benar:

    tail nohup.out
    

    Jika semuanya berfungsi dengan baik, output-nya akan mirip dengan berikut ini:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Memvisualisasikan pesan

Pesan perjalanan individual yang dipublikasikan ke topik Pub/Sub memiliki struktur seperti ini:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

Berdasarkan nilai ini, Anda menghitung beberapa metrik untuk header dasbor. Penghitungan dieksekusi sekali per peristiwa perjalanan masuk. Nilai mencakup berikut ini:

  • Latensi pesan terakhir. Jumlah detik antara stempel waktu peristiwa perjalanan terakhir yang diamati dan waktu saat ini (diperoleh dari jam di sistem yang menghosting browser web).
  • Perjalanan aktif. Jumlah perjalanan yang sedang berlangsung. Jumlah ini dapat bertambah dengan cepat, dan jumlahnya akan berkurang saat nilai ride_status sebesar dropoff diamati.
  • Tarif pesan. Jumlah rata-rata peristiwa perjalanan yang diproses per detik.
  • Total jumlah yang diukur. Jumlah meter dari semua perjalanan aktif. Jumlah ini akan berkurang seiring selesainya perjalanan.
  • Jumlah total penumpang. Jumlah penumpang di semua perjalanan. Jumlah ini akan berkurang seiring selesainya perjalanan.
  • Jumlah rata-rata penumpang per perjalanan. Jumlah total perjalanan, dibagi dengan jumlah total penumpang.
  • Jumlah rata-rata yang diukur per penumpang. Jumlah total yang diukur dibagi dengan jumlah total penumpang.

Selain metrik dan sampel perjalanan individual, saat penumpang dijemput atau diturunkan, dasbor akan menampilkan notifikasi peringatan di atas petak sampel perjalanan.

  1. Dapatkan alamat IP eksternal instance saat ini:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Salin alamat IP.

  3. Di komputer lokal, buka browser web baru, lalu masukkan URL:

    http://$ip-address:8000.

    Anda akan melihat halaman yang menampilkan dasbor untuk tutorial ini:

    Dasbor yang dibuat oleh kode dalam tutorial ini, dengan pesan selamat datang dan sebelum data apa pun ditampilkan.

  4. Klik ikon taksi di bagian atas untuk membuka koneksi ke stream dan mulai memproses pesan.

    Perjalanan individu divisualisasikan dengan sampel sembilan perjalanan aktif yang dirender setiap tiga detik:

    Dasbor yang menampilkan perjalanan aktif.

    Anda dapat mengklik ikon taksi kapan saja untuk memulai atau menghentikan streaming WebSocket. Jika koneksi WebSocket terputus, ikon akan berubah menjadi merah, dan pembaruan metrik serta perjalanan individual akan dihentikan. Untuk menghubungkan kembali, klik ikon taksi lagi.

Performa

Screenshot berikut menunjukkan monitor performa Alat Developer Chrome saat tab browser memproses sekitar 2.100 pesan per detik.

Panel monitor performa browser yang menampilkan penggunaan CPU, ukuran heap, node DOM, dan penghitungan ulang gaya per detik. Nilainya relatif datar.

Dengan pengiriman pesan yang terjadi pada latensi sekitar 30 md, penggunaan CPU rata-rata sekitar 80%. Penggunaan memori ditampilkan minimal 29 MB, dengan total 57 MB yang dialokasikan, serta bertambah dan berkurang dengan bebas.

Pembersihan

Menghapus aturan firewall

Jika Anda menggunakan project yang ada untuk tutorial ini, Anda dapat menghapus aturan firewall yang Anda buat. Sebaiknya minimalkan port terbuka.

  1. Hapus aturan firewall yang Anda buat untuk mengizinkan TCP di port 8000:

    gcloud compute firewall-rules delete websocket
    
  2. Jika Anda juga membuat aturan firewall untuk mengizinkan konektivitas SSH, hapus aturan firewall untuk mengizinkan TCP di port 22:

    gcloud compute firewall-rules delete wss-ssh
    

Menghapus project

Jika tidak ingin menggunakan project ini lagi, Anda dapat menghapusnya.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Langkah berikutnya