Streaming di messaggi Pub/Sub tramite WebSocket


Questo tutorial illustra un modo per un'app frontend, in questo caso una pagina web, di gestire volumi elevati di dati in entrata quando utilizziGoogle Cloud. Il tutorial descrive alcune delle sfide dei flussi ad alto volume. In questo tutorial viene fornita un'app di esempio che illustra come utilizzare WebSockets per visualizzare un flusso denso di messaggi pubblicati in un argomento Pub/Sub, elaborandoli in modo tempestivo per mantenere un frontend performante.

Questo tutorial è rivolto agli sviluppatori che hanno familiarità con la comunicazione browser-server tramite HTTP e con la scrittura di app frontend utilizzando HTML, CSS e JavaScript. Il tutorial presuppone che tu abbia esperienza con Google Cloud e che tu abbia familiarità con gli strumenti a riga di comando Linux.

Obiettivi

  • Crea e configura un'istanza di macchina virtuale (VM) con i componenti necessari per trasmettere in streaming i payload di una sottoscrizione Pub/Sub ai client browser.
  • Configura un processo sulla VM per sottoscrivere un argomento Pub/Sub e inviare i singoli messaggi a un log.
  • Installa un server web per gestire contenuti statici e trasmettere l'output del comando shell ai client WebSocket.
  • Visualizza le aggregazioni del flusso WebSocket e i singoli campioni di messaggi in un browser utilizzando HTML, CSS e JavaScript.

Costi

In questo documento utilizzi i seguenti componenti fatturabili di Google Cloud:

Per generare una stima dei costi in base all'utilizzo previsto, utilizza il calcolatore prezzi.

I nuovi Google Cloud utenti potrebbero avere diritto a una prova gratuita.

Prima di iniziare

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Apri Cloud Shell per eseguire i comandi elencati in questo tutorial.

    VAI A Cloud Shell

    Esegui tutti i comandi del terminale in questo tutorial da Cloud Shell.

  7. Abilita l'API Compute Engine e l'API Pub/Sub:
    gcloud services enable compute pubsub
  8. Al termine di questo tutorial, puoi evitare l'addebito di ulteriori costi eliminando le risorse create. Per maggiori dettagli, vedi Esegui la pulizia.

Introduzione

Man mano che sempre più app adottano modelli basati sugli eventi, è importante che le app frontend siano in grado di stabilire connessioni semplici e senza problemi con i servizi di messaggistica che costituiscono la base di queste architetture.

Esistono diverse opzioni per lo streaming di dati ai client browser web; la più comune è WebSockets. Questo tutorial ti guida nell'installazione di un processo che si iscrive a un flusso di messaggi pubblicati in un argomento Pub/Sub e li indirizza tramite il server web ai client connessi tramite WebSocket.

Per questo tutorial, utilizzerai l'argomento Pub/Sub disponibile pubblicamente utilizzato nel codelab Google Dataflow NYC Taxi Tycoon. Questo argomento fornisce un flusso in tempo reale di telemetria simulata dei taxi basata sui dati storici delle corse effettuate a New York dai set di dati della Taxi & Limousine Commission.

Architettura

Il seguente diagramma mostra l'architettura del tutorial che creerai in questo tutorial.

Architettura del tutorial

Il diagramma mostra un publisher di messaggi esterno al progetto che contiene la risorsa Compute Engine; il publisher invia messaggi a un argomento Pub/Sub. L'istanza Compute Engine rende i messaggi disponibili tramite WebSocket a un browser che esegue un dashboard basato su HTML5 e JavaScript.

Questo tutorial utilizza una combinazione di strumenti per collegare Pub/Sub e Websocket:

  • pulltop è un programma Node.js che installi nell'ambito di questo tutorial. Lo strumento si iscrive a un argomento Pub/Sub e trasmette in streaming i messaggi ricevuti all'output standard.
  • websocketd è un piccolo strumento a riga di comando che racchiude un programma di interfaccia a riga di comando esistente e consente di accedervi utilizzando un WebSocket.

Combinando pulltop e websocketd, puoi trasmettere in streaming i messaggi ricevuti dall'argomento Pub/Sub a un browser utilizzando WebSocket.

Regolazione della velocità effettiva dell'argomento Pub/Sub

L'argomento Pub/Sub pubblico NYC Taxi Tycoon genera da 2000 a 2500 aggiornamenti simulati delle corse in taxi al secondo, fino a 8 MB o più al secondo. Il controllo del flusso integrato in Pub/Sub rallenta automaticamente la velocità dei messaggi di un abbonato se Pub/Sub rileva una coda crescente di messaggi non confermati. Pertanto, potresti notare una variabilità elevata della frequenza dei messaggi in diverse workstation, connessioni di rete e codice di elaborazione front-end.

Elaborazione efficace dei messaggi del browser

Dato l'elevato volume di messaggi provenienti dal flusso WebSocket, devi prestare attenzione alla scrittura del codice frontend che elabora questo flusso. Ad esempio, puoi creare dinamicamente elementi HTML per ogni messaggio. Tuttavia, alla velocità di messaggi prevista, l'aggiornamento della pagina per ogni messaggio potrebbe bloccare la finestra del browser. Le allocazioni di memoria frequenti derivanti dalla creazione dinamica di elementi HTML estendono anche le durate della garbage collection, peggiorando l'esperienza utente. In breve, non vuoi chiamare document.createElement() per ciascuno dei circa 2000 messaggi che arrivano ogni secondo.

L'approccio adottato da questo tutorial per gestire questo flusso denso di messaggi è il seguente:

  • Calcola e aggiorna continuamente un insieme di metriche di streaming in tempo reale, mostrando la maggior parte delle informazioni sui messaggi osservati come valori aggregati.
  • Utilizza una dashboard basata sul browser per visualizzare un piccolo campione di singoli messaggi in base a una pianificazione predefinita, che mostra solo gli eventi di rilascio e ritiro in tempo reale.

La figura seguente mostra la dashboard creata nell'ambito di questo tutorial.

Dashboard creata nella pagina web dal codice di questo tutorial

La figura mostra una latenza dell'ultimo messaggio di 24 millisecondi a una velocità di quasi 2100 messaggi al secondo. Se i percorsi di codice critici per l'elaborazione di ogni singolo messaggio non vengono completati in tempo, il numero di messaggi osservati al secondo diminuisce man mano che aumenta la latenza dell'ultimo messaggio. Il campionamento della corsa viene eseguito utilizzando il set di API JavaScript setInterval impostato sul ciclo una volta ogni tre secondi, il che impedisce al frontend di creare un numero enorme di elementi DOM durante il suo ciclo di vita. La stragrande maggioranza di questi è praticamente non osservabile a velocità superiori a 10 al secondo.

La dashboard inizia a elaborare gli eventi a metà dello stream, quindi le corse già in corso vengono riconosciute come nuove dalla dashboard, a meno che non siano già state visualizzate. Il codice utilizza un array associativo per memorizzare ogni corsa osservata, indicizzata in base al valore ride_id e rimuove il riferimento a una corsa specifica quando il passeggero è stato accompagnato. Le corse in stato "in viaggio" o "in attesa" aggiungono un riferimento a questo array, a meno che (nel caso di "in viaggio") la corsa non sia stata osservata in precedenza.

Installa e configura il server WebSocket

Per iniziare, crea un'istanza Compute Engine che utilizzerai come server WebSocket. Dopo aver creato l'istanza, installa gli strumenti che ti serviranno in seguito.

  1. In Cloud Shell, imposta la zona di Compute Engine predefinita. L'esempio seguente mostra us-central1-a, ma puoi utilizzare qualsiasi zona che preferisci.

    gcloud config set compute/zone us-central1-a
    
  2. Crea un'istanza Compute Engine denominata websocket-server nella zona predefinita:

    gcloud compute instances create websocket-server --tags wss
    
  3. Aggiungi una regola firewall che consenta il traffico TCP sulla porta 8000 a qualsiasi istanza taggata come wss:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Se utilizzi un progetto esistente, assicurati che la porta TCP 22 sia aperta per consentire la connettività SSH all'istanza.

    Per impostazione predefinita, la regola firewall default-allow-ssh è abilitata nella rete predefinita. Tuttavia, se tu o il tuo amministratore avete rimosso la regola predefinita in un progetto esistente, la porta TCP 22 potrebbe non essere aperta. Se hai creato un nuovo progetto per questo tutorial, la regola è attivata per impostazione predefinita e non devi fare nulla.

    Aggiungi una regola firewall che consenta il traffico TCP sulla porta 22 a qualsiasi istanza taggata come wss:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Connettiti all'istanza tramite SSH:

    gcloud compute ssh websocket-server
    
  6. Nel comando del terminale dell'istanza, cambia account in root in modo da poter installare il software:

    sudo -s
    
  7. Installa gli strumenti git e unzip:

    apt-get install -y unzip git
    
  8. Installa il programma binario websocketd sull'istanza:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Installa Node.js e il codice del tutorial

  1. In un terminale dell'istanza, installa Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Scarica il repository di codice sorgente del tutorial:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Modifica le autorizzazioni su pulltop per consentire l'esecuzione:

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Installa le dipendenze di pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Verifica che pulltop possa leggere i messaggi

  1. Sull'istanza, esegui pulltop sull'argomento pubblico:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Se pulltop funziona, vedrai un flusso di risultati simile al seguente:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Premi Ctrl+C per interrompere lo streaming.

Stabilire il flusso di messaggi a websocketd

Ora che hai stabilito che pulltop può leggere l'argomento Pub/Sub, puoi avviare il processo websocketd per iniziare a inviare messaggi al browser.

Acquisire i messaggi dell'argomento in un file locale

Per questo tutorial, acquisisci il flusso di messaggi che ricevi da pulltop e scrivilo in un file locale. L'acquisizione del traffico di messaggi in un file locale aggiunge un requisito di archiviazione, ma disaccoppia anche il funzionamento del processo websocketd dai messaggi dell'argomento Pub/Sub di streaming. L'acquisizione delle informazioni in locale consente scenari in cui potresti voler interrompere temporaneamente lo streaming Pub/Sub (magari per modificare i parametri di controllo del flusso) senza forzare un ripristino dei client WebSocket attualmente connessi. Quando il flusso di messaggi viene ristabilito, websocketd riprende automaticamente lo streaming dei messaggi ai client.

  1. Nell'istanza, esegui pulltop sull'argomento pubblico e reindirizza l'output del messaggio al file locale taxi.json. Il comando nohup indica al sistema operativo di mantenere in esecuzione il processo pulltop se esci o chiudi il terminale.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Verifica che i messaggi JSON vengano scritti nel file:

    tail /var/tmp/taxi.json
    

    Se i messaggi vengono scritti nel file taxi.json, l'output è simile al seguente:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Passa alla cartella web della tua app:

    cd ../web
    
  4. Avvia websocketd per iniziare lo streaming dei contenuti del file locale utilizzando WebSocket:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    In questo modo viene eseguito il comando websocketd in background. Lo strumento websocketd utilizza l'output del comando tail e trasmette in streaming ogni elemento come messaggio WebSocket.

  5. Controlla i contenuti di nohup.out per verificare che il server sia stato avviato correttamente:

    tail nohup.out
    

    Se tutto funziona correttamente, l'output è simile al seguente:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Visualizzare i messaggi

I singoli messaggi di corsa pubblicati nell'argomento Pub/Sub hanno una struttura simile a questa:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

In base a questi valori, calcoli diverse metriche per l'intestazione della dashboard. I calcoli vengono eseguiti una volta per ogni evento di corsa in arrivo. I valori includono quanto segue:

  • Latenza dell'ultimo messaggio. Il numero di secondi tra il timestamp dell'ultimo evento di corsa osservato e l'ora corrente (derivata dall'orologio del sistema che ospita il browser web).
  • Corse attive. Il numero di corse attualmente in corso. Questo numero può aumentare rapidamente e diminuisce quando viene osservato un valore ride_status di dropoff.
  • Tariffa dei messaggi. Il numero medio di eventi di corsa elaborati al secondo.
  • Importo totale misurato. La somma dei tassametri di tutte le corse attive. Questo numero diminuisce man mano che le corse vengono completate.
  • Numero totale di passeggeri. Il numero di passeggeri in tutte le corse. Questo numero diminuisce man mano che le corse vengono completate.
  • Numero medio di passeggeri per corsa. Il numero totale di corse, diviso per il numero totale di passeggeri.
  • Importo medio tassametro per passeggero. L'importo totale misurato diviso per il numero totale di passeggeri.

Oltre alle metriche e ai singoli campioni di corse, quando un passeggero viene prelevato o accompagnato, la dashboard mostra una notifica di avviso sopra la griglia dei campioni di corse.

  1. Ottieni l'indirizzo IP esterno dell'istanza corrente:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Copia l'indirizzo IP.

  3. Sul computer locale, apri un nuovo browser web e inserisci l'URL:

    http://$ip-address:8000.

    Viene visualizzata una pagina che mostra la dashboard per questo tutorial:

    Dashboard creata dal codice in questo tutorial, con messaggio di benvenuto e prima della visualizzazione dei dati.

  4. Fai clic sull'icona del taxi in alto per aprire una connessione allo stream e iniziare a elaborare i messaggi.

    Le singole corse vengono visualizzate con un campione di nove corse attive renderizzate ogni tre secondi:

    Dashboard che mostra le corse attive.

    Puoi fare clic sull'icona del taxi in qualsiasi momento per avviare o interrompere lo stream WebSocket. Se la connessione WebSocket viene interrotta, l'icona diventa rossa e gli aggiornamenti delle metriche e delle singole corse vengono interrotti. Per riconnetterti, fai di nuovo clic sull'icona del taxi.

Prestazioni

Lo screenshot seguente mostra il monitor delle prestazioni degli Strumenti per sviluppatori di Chrome mentre la scheda del browser elabora circa 2100 messaggi al secondo.

Il riquadro del monitoraggio delle prestazioni del browser mostra l'utilizzo della CPU, le dimensioni dell'heap, i nodi DOM e i ricalcoli dello stile al secondo. I valori sono relativamente piatti.

Con l'invio dei messaggi con una latenza di circa 30 ms, l'utilizzo della CPU è in media intorno all'80%. L'utilizzo della memoria viene visualizzato a un minimo di 29 MB, con 57 MB allocati in totale, e aumenta e diminuisce liberamente.

Esegui la pulizia

Rimuovi regole firewall

Se hai utilizzato un progetto esistente per questo tutorial, puoi rimuovere le regole firewall che hai creato. È consigliabile ridurre al minimo le porte aperte.

  1. Elimina la regola firewall che hai creato per consentire il traffico TCP sulla porta 8000:

    gcloud compute firewall-rules delete websocket
    
  2. Se hai creato anche una regola firewall per consentire la connettività SSH, elimina la regola firewall per consentire TCP sulla porta 22:

    gcloud compute firewall-rules delete wss-ssh
    

Elimina il progetto

Se non vuoi utilizzare di nuovo questo progetto, puoi eliminarlo.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Passaggi successivi