透過 WebSocket 串流傳送 Pub/Sub 訊息


本教學課程將說明前端應用程式 (在本例中為網頁) 在使用Google Cloud時,處理大量傳入資料的方式。本教學課程將說明大量串流的部分挑戰。本教學課程提供範例應用程式,說明如何使用 WebSockets 將發布至 Pub/Sub 主題的密集訊息串流視覺化,並及時處理這些訊息,以維持前端效能。

本教學課程適用於熟悉透過 HTTP 進行瀏覽器與伺服器間通訊,以及使用 HTML、CSS 和 JavaScript 編寫前端應用程式的開發人員。本教學課程假設您曾經使用過Google Cloud,且熟悉 Linux 指令列工具。

目標

  • 建立並設定虛擬機器 (VM) 執行個體,並加入必要元件,以便將 Pub/Sub 訂閱的酬載串流傳送至瀏覽器用戶端。
  • 在 VM 上設定程序,以便訂閱 Pub/Sub 主題,並將個別訊息輸出至記錄檔。
  • 安裝網路伺服器,以便提供靜態內容,並將殼層指令輸出內容串流至 WebSocket 用戶端。
  • 使用 HTML、CSS 和 JavaScript,在瀏覽器中以圖形化方式呈現 WebSocket 串流匯總資料和個別訊息範例。

費用

在本文件中,您會使用 Google Cloud的下列計費元件:

您可以使用 Pricing Calculator 根據預測用量產生預估費用。 新 Google Cloud 使用者可能符合申請免費試用的資格。

事前準備

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. 打開 Cloud Shell 以執行本教學課程中列出的指令。

    前往 Cloud Shell

    您將使用 Cloud Shell 執行本教學課程中所有的終端機指令。

  7. 啟用 Compute Engine API 和 Pub/Sub API:
    gcloud services enable compute pubsub
  8. 完成本教學課程後,您可以刪除建立的資源以避免繼續計費。詳情請參閱清除所用資源一節。

簡介

隨著越來越多應用程式採用事件驅動模型,前端應用程式必須能夠與訊息服務建立簡單、低摩擦力的連線,而這類服務正是這些架構的基礎。

您可以透過多種方式將資料串流至網路瀏覽器用戶端,其中最常見的是 WebSocket。本教學課程將逐步說明如何安裝程序,以便訂閱發布至 Pub/Sub 主題的訊息串流,並將這些訊息透過網頁伺服器轉送至透過 WebSocket 連線的用戶端。

在本教學課程中,您將使用 紐約市計程車業大亨 Google Dataflow 程式碼研究室中使用的公開 Pub/Sub 主題。本主題會根據紐約市計程車暨禮車管理局行程記錄資料集中的行程歷來資料,提供模擬計程車遙測資料的即時串流。

架構

下圖顯示您在本教學課程中建構的架構。

教學課程的架構

圖表顯示位於含有 Compute Engine 資源專案之外的訊息發布端,發布端會將訊息傳送至 Pub/Sub 主題。Compute Engine 執行個體會透過 WebSocket 將訊息提供給瀏覽器,讓瀏覽器執行以 HTML5 和 JavaScript 為基礎的資訊主頁。

本教學課程會結合多種工具,建立 Pub/Sub 和 WebSocket 的橋接:

  • pulltop 是您在本教學課程中安裝的 Node.js 程式。這項工具會訂閱 Pub/Sub 主題,並將收到的訊息串流至標準輸出內容。
  • websocketd 是一款小型指令列工具,可包裝現有的指令列介面程式,並允許使用 WebSocket 存取該程式。

結合 pulltopwebsocketd 後,您可以使用 WebSocket 將從 Pub/Sub 主題串流傳送至瀏覽器的訊息接收。

調整 Pub/Sub 主題吞吐量

紐約市計程車業大亨公開的 Pub/Sub 主題每秒產生 2000 到 2500 次模擬計程車行程更新,每秒最多可達 8 MB 如果 Pub/Sub 偵測到未確認訊息的佇列數量持續增加,Pub/Sub 內建的流量控制功能就會自動降低訂閱端的訊息傳送速率。因此,您可能會發現不同工作站、網路連線和前端處理程式碼之間的訊息傳送率變化幅度很大。

有效的瀏覽器訊息處理

由於 WebSocket 串流會傳送大量訊息,因此您必須仔細編寫用於處理此串流的前端程式碼。舉例來說,您可以為每則訊息動態建立 HTML 元素。但在預期的訊息速率下,為每則訊息更新網頁可能會導致瀏覽器視窗鎖定。動態建立 HTML 元素所導致的頻繁記憶體配置作業,也會延長垃圾收集時間,進而降低使用者體驗。簡而言之,您不想針對每秒約 2, 000 則訊息的每則訊息呼叫 document.createElement()

本教學課程採用的方法,可用於管理這類密集的訊息串流,如下所示:

  • 即時計算並持續更新一組串流指標,以匯總值的形式顯示觀察訊息的大部分資訊。
  • 使用瀏覽器版資訊主頁,按預先定義的時間表將個別訊息的少量樣本視覺化,只顯示即時的下車和上車事件。

下圖顯示在本教學課程中建立的資訊主頁。

本教學課程中程式碼在網頁上建立的資訊主頁

圖表顯示最後一則訊息的延遲時間為 24 毫秒,每秒傳送的訊息數量為 2100 則。如果處理個別訊息的重要程式碼路徑未及時完成,則每秒觀察到的訊息數量會隨著最後一則訊息的延遲時間增加而減少。行程取樣作業會使用 JavaScript setInterval API 進行,並將其設定為每三秒週期一次,以免前端在其生命週期內建立大量 DOM 元素。(實際上,在每秒超過 10 次的情況下,絕大多數的事件都無法觀察到)。

資訊主頁會在串流中間開始處理事件,因此如果行程尚未開始,資訊主頁就會將其視為新行程,程式碼會使用關聯陣列儲存每趟觀察到的行程,並以 ride_id 值編入索引,並在乘客下車時移除對特定行程的參照。處於「行程中」或「接送」狀態的行程會新增對該陣列的參照,除非 (針對「行程中」的情況) 先前已觀察到該行程。

安裝及設定 WebSocket 伺服器

首先,您需要建立要用於 WebSocket 伺服器的 Compute Engine 執行個體。建立執行個體後,您可以安裝日後需要的工具。

  1. 在 Cloud Shell 中設定預設的 Compute Engine 區域。以下範例顯示 us-central1-a,但您可以使用任何所需的區域。

    gcloud config set compute/zone us-central1-a
    
  2. 在預設區域中建立名為 websocket-server 的 Compute Engine 執行個體:

    gcloud compute instances create websocket-server --tags wss
    
  3. 新增防火牆規則,允許通訊埠 8000 的 TCP 流量傳送至標記為 wss 的任何執行個體:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. 如果您使用的是現有專案,請確認 TCP 通訊埠 22 已開啟,以便與執行個體建立 SSH 連線。

    根據預設,系統會在預設網路中啟用 default-allow-ssh 防火牆規則。不過,如果您或管理員移除了現有專案中的預設規則,TCP 通訊埠 22 可能就不會開啟。(如果您是為了這個教學課程建立新專案,則系統會預設啟用這項規則,您不必採取任何行動)。

    新增防火牆規則,允許通訊埠 22 的 TCP 流量傳送至標記為 wss 的任何執行個體:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. 使用 SSH 連線至執行個體:

    gcloud compute ssh websocket-server
    
  6. 在執行個體的終端機指令中,將帳戶切換為 root,以便安裝軟體:

    sudo -s
    
  7. 安裝 gitunzip 工具:

    apt-get install -y unzip git
    
  8. 在執行個體上安裝 websocketd 二進位檔:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

安裝 Node.js 和教學課程程式碼

  1. 在執行個體的終端機中,安裝 Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. 下載教學課程來源存放區:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. 變更 pulltop 的權限,允許執行:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. 安裝 pulltop 依附元件:

    cd pulltop
    npm install
    sudo npm link
    

測試 pulltop 是否能讀取訊息

  1. 在執行個體上,針對公開主題執行 pulltop

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    如果 pulltop 運作正常,您會看到以下類似的結果串流:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. 按下 Ctrl+C 即可停止串流。

建立傳送至 websocketd 的訊息流

您已確認 pulltop 可以讀取 Pub/Sub 主題,現在可以啟動 websocketd 程序,開始傳送訊息至瀏覽器。

將主題訊息擷取到本機檔案

在本教學課程中,您將擷取從 pulltop 取得的訊息串流,並將其寫入本機檔案。將訊息流量擷取到本機檔案會增加儲存空間需求,但也會將 websocketd 程序的作業與串流 Pub/Sub 主題訊息分離。在本機擷取資訊可讓您在暫停 Pub/Sub 串流 (例如調整流量控管參數) 時,不必強制重設目前已連線的 WebSocket 用戶端。訊息串流重新建立後,websocketd 會自動恢復向用戶端串流訊息。

  1. 在執行個體上針對公開主題執行 pulltop,並將訊息輸出內容重新導向至本機 taxi.json 檔案。如果您登出或關閉終端機,nohup 指令會指示作業系統繼續執行 pulltop 程序。

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. 確認 JSON 訊息是否已寫入檔案:

    tail /var/tmp/taxi.json
    

    如果訊息會寫入 taxi.json 檔案,輸出內容會類似以下內容:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. 切換至應用程式的網頁資料夾:

    cd ../web
    
  4. 啟動 websocketd,開始使用 WebSocket 串流處理本機檔案的內容:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    這會在背景執行 websocketd 指令。websocketd 工具會使用 tail 指令的輸出內容,並將每個元素串流傳送為 WebSocket 訊息。

  5. 檢查 nohup.out 的內容,確認伺服器是否已正確啟動:

    tail nohup.out
    

    如果一切運作正常,輸出結果會如下所示:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

以視覺化方式顯示訊息

發布至 Pub/Sub 主題的個別行程訊息結構如下所示:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

您可以根據這些值,計算資訊主頁標題的多個指標。系統會針對每個上車行程事件執行一次計算。值包括:

  • 上次傳送訊息的延遲時間。最後一次觀察到的行程事件時間戳記與目前時間 (取自代管網頁瀏覽器的系統時鐘) 之間的秒數。
  • 行程進行中。目前行程的數量。這個數字可能會迅速增加,並且在觀察到 ride_status 值為 dropoff 時會減少。
  • 訊息傳送頻率:每秒處理的平均行程事件數。
  • 總計量數量。所有有效行程的計程表總和。隨著行程結束,這個數字會減少。
  • 乘客總人數。所有行程的乘客人數。隨著行程完成,這個數字會減少。
  • 每趟行程的平均乘客人數。行程總數除以乘客總數。
  • 每位乘客的平均計費金額。總計費率金額除以乘客總人數。

除了指標和個別行程樣本之外,當乘客上車或下車時,資訊主頁會在行程樣本的格線上方顯示快訊通知。

  1. 取得目前執行個體的外部 IP 位址:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. 複製 IP 位址。

  3. 在本機電腦上,開啟新的網路瀏覽器,然後輸入下列網址:

    http://$ip-address:8000

    您會看到顯示本教學課程資訊主頁的頁面:

    本教學課程中程式碼建立的資訊主頁,包含歡迎訊息,以及在顯示任何資料之前。

  4. 按一下頂端的計程車圖示,開啟與串流的連線,並開始處理訊息。

    個別行程會以視覺化方式呈現,每三秒會顯示九個活動行程的樣本:

    資訊主頁顯示有效行程。

    您隨時可以按一下計程車圖示,開始或停止 WebSocket 串流。如果 WebSocket 連線中斷,圖示會變成紅色,指標和個別行程的更新也會暫停。如要重新連線,請再次點選計程車圖示。

成效

下圖顯示 Chrome 開發人員工具效能監控器,當時瀏覽器分頁每秒處理約 2100 則訊息。

瀏覽器效能監控窗格,顯示 CPU 用量、堆積大小、DOM 節點和每秒樣式重新計算次數。值相對平坦。

訊息調度作業的延遲時間約為 30 毫秒,CPU 使用率平均約為 80%。記憶體使用率顯示為至少 29 MB,總共分配 57 MB,且可自由增減。

清除所用資源

移除防火牆規則

如果您使用現有專案進行本教學課程,可以移除您建立的防火牆規則。建議您盡可能減少開放的連接埠。

  1. 刪除您建立的防火牆規則,以便允許通訊埠 8000 上的 TCP:

    gcloud compute firewall-rules delete websocket
    
  2. 如果您也建立了防火牆規則來允許 SSH 連線,請刪除防火牆規則,以便在 22 通訊埠上允許 TCP:

    gcloud compute firewall-rules delete wss-ssh
    

刪除專案

如果不想再使用這個專案,可以刪除專案。

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

後續步驟