Streaming von Pub/Sub-Nachrichten über WebSockets


In dieser Anleitung wird erläutert, wie Sie in Google Cloud große Mengen eingehender Daten für eine Frontend-Anwendung (in diesem Fall eine Webseite) verwalten können. Es werden auch einige der Herausforderungen beschrieben, die bei Streams mit hohem Volumen auftreten können. Ferner ist eine Beispielanwendung enthalten, die veranschaulicht, wie Sie mit WebSockets einen dichten Stream von Nachrichten visualisieren können, um sie effizient zu verarbeiten und eine hohe Leistung des Front-Ends zu gewährleisten.

Diese Anwendung richtet sich an Entwickler, die mit der Kommunikation zwischen Browser und Server über HTTP und dem Schreiben von Frontend-Anwendungen mit HTML, CSS und JavaScript vertraut sind. In diesem Dokument wird vorausgesetzt, dass Sie mit Google Cloud und den Befehlszeilentools von Linux vertraut sind.

Ziele

  • VM-Instanz mit den erforderlichen Komponenten erstellen und konfigurieren, um die Nutzlasten eines Pub/Sub-Abos an Browser-Clients zu streamen
  • Prozess auf der VM konfigurieren, um ein Pub/Sub-Thema zu abonnieren und die einzelnen Nachrichten in einem Log auszugeben
  • Webserver installieren, um statische Inhalte bereitzustellen und Shell-Befehlsausgaben an WebSocket-Clients zu streamen
  • Zusammenfassungen von WebSocket-Streams und einzelne Nachrichtenbeispiele in einem Browser mit HTML, CSS und JavaScript visualisieren

Kosten

In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:

Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen. Neuen Google Cloud-Nutzern steht möglicherweise eine kostenlose Testversion zur Verfügung.

Hinweise

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Öffnen Sie Cloud Shell, um die in dieser Anleitung aufgeführten Befehle auszuführen.

    Zu Cloud Shell

    Alle Terminalbefehle in dieser Anleitung werden über Cloud Shell ausgeführt.

  7. Aktivieren Sie die Compute Engine API und die Pub/Sub API:
    gcloud services enable compute pubsub

Nach Abschluss dieser Anleitung können Sie weitere Kosten durch Löschen von erstellten Ressourcen vermeiden. Weitere Informationen finden Sie im Abschnitt Bereinigen.

Einleitung

Immer mehr Anwendungen arbeiten nach ereignisgesteuerten Modellen. Daher ist es wichtig, dass Frontend-Anwendungen einfach und reibungslos Verbindungen zu den Messaging-Diensten herstellen können, die diesen Architekturen zugrunde liegen.

Es gibt verschiedene Möglichkeiten für das Streaming von Daten an Webbrowser-Clients. Am häufigsten werden WebSockets verwendet. Diese Anleitung führt Sie durch die Installation eines Prozesses, mit dem ein Nachrichtenstrom abonniert, in einem Pub/Sub-Thema veröffentlicht und anschließend über den Webserver an Clients weitergeleitet wird, die über WebSockets verbunden sind.

In dieser Anleitung arbeiten Sie mit dem öffentlich verfügbaren Pub/Sub-Thema, das im Google Dataflow-Codelab "NYC Taxi Tycoon" verwendet wird. Dieses Thema beinhaltet einen Echtzeitstrom simulierter Taxi-Telemetriedaten, die auf Fahrtenaufzeichnungen aus den Datasets der Taxi & Limousine Commission aus New York City beruhen.

Architektur

Das nachstehende Diagramm veranschaulicht den Aufbau der Architektur, die Sie in dieser Anleitung erstellen.

Architektur des Tutorials

Das Diagramm zeigt einen Nachrichten-Publisher außerhalb des Projekts, das die Compute Engine-Ressource enthält. Der Publisher sendet Nachrichten an ein Pub/Sub-Thema. Die Compute Engine-Instanz stellt die Nachrichten über WebSockets einem Browser zur Verfügung, in dem ein HTML5- und JavaScript-gestütztes Dashboard ausgeführt wird.

In dieser Anleitung werden verschiedene Tools kombiniert, um Pub/Sub und WebSockets zu verbinden:

  • pulltop ist ein Node.js-Programm, das Sie im Verlauf dieser Anleitung installieren. Das Tool abonniert ein Pub/Sub-Thema und streamt die empfangenen Nachrichten an die Standardausgabe.
  • websocketd ist ein kleines Befehlszeilentool, das eine vorhandene Befehlszeile einschließt und den Zugriff über ein WebSocket ermöglicht.

Durch die Kombination von pulltop und websocketd können Sie Nachrichten von dem Pub/Sub-Thema empfangen und über WebSockets an einen Browser streamen.

Durchsatz des Pub/Sub-Themas festlegen

Das öffentliche Pub/Sub-Thema von NYC Taxi Tycoon generiert 2.000 bis 2.500 simulierte Aktualisierungen von Taxifahrten pro Sekunde, was 8 MB oder mehr pro Sekunde entspricht. Die in Pub/Sub integrierte Ablaufsteuerung verringert den Nachrichtendurchsatz eines Abonnenten automatisch, wenn Pub/Sub eine wachsende Warteschlange nicht bestätigter Nachrichten erkennt. Daher kann der Nachrichtendurchsatz je nach Workstation, Netzwerkverbindung und Verarbeitungscode des Front-Ends stark variieren.

Effiziente Verarbeitung von Browsermeldungen

Angesichts des hohen Nachrichtenvolumens, das über den WebSocket-Stream eingeht, müssen Sie beim Schreiben des Frontend-Codes, der diesen Stream verarbeitet, sorgfältig vorgehen. Beispielsweise könnten Sie dynamisch HTML-Elemente für jede Nachricht erstellen. Bei dem erwarteten Nachrichtendurchsatz kann das Aktualisieren der Seite für jede Nachricht jedoch das Browserfenster blockieren. Häufige Arbeitsspeicherzuweisungen, die sich aus der dynamischen Erstellung von HTML-Elementen ergeben, verlängern auch die automatische Speicherbereinigung und beeinträchtigen den Nutzerkomfort. Mit anderen Worten, Sie werden nicht für jede der rund 2.000 Nachrichten, die pro Sekunde eingehen, document.createElement() aufrufen wollen.

In dieser Anleitung wird daher für die Verwaltung dieses dichten Nachrichtenstreams folgender Ansatz verwendet:

  • Berechnung und kontinuierliche Aktualisierung einiger Streaming-Messwerte in Echtzeit. Dabei werden die meisten Informationen zu den beobachteten Nachrichten als zusammengefasste Werte angezeigt.
  • Verwendung eines browsergestützten Dashboards zur Anzeige eines kleinen Auszugs der einzelnen Nachrichten nach einem vorprogrammierten Zeitplan, wobei nur Ein- und Ausstiegsereignisse in Echtzeit angezeigt werden.

Die nachstehende Abbildung zeigt das Dashboard, das im Rahmen dieser Anleitung erstellt wird.

Das in dieser Anleitung erstellte Dashboard auf der Webseite

Die Abbildung zeigt eine Latenz der letzten Nachricht von 24 Millisekunden bei einem Durchsatz von fast 2.100 Nachrichten pro Sekunde. Wenn die für die Verarbeitung jeder einzelnen Nachricht kritischen Codepfade nicht rechtzeitig abgeschlossen werden, nimmt die Anzahl der pro Sekunde beobachteten Nachrichten mit zunehmender Latenz der letzten Nachricht ab. Die Erfassung der Fahrt erfolgt mit der JavaScript API setInterval, die alle drei Sekunden ausgeführt wird. Damit wird verhindert, dass das Frontend während seiner Laufzeit eine zu hohe Anzahl von DOM-Elementen erstellt. Bei Durchsatzraten von mehr als 10 pro Sekunde kann der überwiegende Teil dieser Elemente in jedem Fall nicht mehr beobachtet werden.

Das Dashboard beginnt mit der Verarbeitung von Ereignissen in der Mitte des Streams, sodass bereits laufende Fahrten vom Dashboard als neu erkannt werden, sofern sie nicht zuvor erfasst wurden. Der Code verwendet ein assoziatives Array zum Speichern jeder beobachteten Fahrt, das durch den Wert ride_id indexiert wird, und entfernt den Bezug zur spezifischen Fahrt, wenn der Fahrgast ausgestiegen ist. Fahrten im Status "enroute" oder "pickup" fügen einen Bezug zum Array hinzu, es sei denn (im Fall von "enroute"), die Fahrt wurde bereits vorher beobachtet.

WebSocket-Server installieren und konfigurieren

Erstellen Sie zuerst die Compute Engine-Instanz, die Sie als WebSocket-Server nutzen werden. Installieren Sie auf dieser Instanz dann die Tools, die Sie später benötigen.

  1. Legen Sie in Cloud Shell die Standardzone für Compute Engine fest. Im folgenden Beispiel wird us-central1-a verwendet, Sie können jedoch eine beliebige Zone einstellen.

    gcloud config set compute/zone us-central1-a
    
  2. Erstellen Sie eine Compute Engine-Instanz mit dem Namen websocket-server in der Standardzone:

    gcloud compute instances create websocket-server --tags wss
    
  3. Fügen Sie eine Firewall-Regel hinzu, die TCP-Traffic auf Port 8000 zu jeder als wss gekennzeichneten Instanz zulässt:

    gcloud compute firewall-rules create websocket \
        --direction=IN \
        --allow=tcp:8000 \
        --target-tags=wss
    
  4. Stellen Sie bei Verwendung eines vorhandenen Projekts sicher, dass der TCP-Port 22 geöffnet ist, um die SSH-Verbindung zur Instanz zuzulassen.

    Standardmäßig ist die Firewallregel default-allow-ssh im Standardnetzwerk aktiviert. Wenn Sie oder Ihr Administrator die Standardregel in einem vorhandenen Projekt jedoch entfernt haben, ist der TCP-Port 22 möglicherweise nicht geöffnet. (Wenn Sie ein neues Projekt für diese Anleitung erstellt haben, ist die Regel standardmäßig aktiviert und Sie müssen nichts tun.)

    Fügen Sie eine Firewall-Regel hinzu, die TCP-Traffic auf Port 22 zu jeder als wss gekennzeichneten Instanz zulässt:

    gcloud compute firewall-rules create wss-ssh \
        --direction=IN \
        --allow=tcp:22 \
        --target-tags=wss
    
  5. Stellen Sie über SSH eine Verbindung zur Instanz her:

    gcloud compute ssh websocket-server
    
  6. Stellen Sie im Terminalbefehl der Instanz die Konten auf root, damit Sie die Software installieren können:

    sudo -s
    
  7. Installieren Sie die Tools git und unzip.

    apt-get install -y unzip git
    
  8. Installieren Sie die Binärdatei websocketd auf der Instanz:

    cd /var/tmp/
    wget \
    https://github.com/joewalnes/websocketd/releases/download/v0.3.0/websocketd-0.3.0-linux_386.zip
    unzip websocketd-0.3.0-linux_386.zip
    mv websocketd /usr/bin
    

Installieren Sie Node.js und den Anleitungscode.

  1. Installieren Sie in einem Terminal der Instanz Node.js:

    curl -sL https://deb.nodesource.com/setup_10.x | bash -
    apt-get install -y nodejs
    
  2. Laden Sie das Quell-Repository der Anleitung herunter:

    exit
    cd ~
    git clone https://github.com/GoogleCloudPlatform/solutions-pubsub-websockets.git
    
  3. Ändern Sie die Berechtigungen in pulltop, um die Ausführung zuzulassen.

    cd solutions-pubsub-websockets
    chmod 755 pulltop/pulltop.js
    
  4. Installieren Sie die Abhängigkeiten für pulltop:

    cd pulltop
    npm install
    sudo npm link
    

Testen, ob pulltop Nachrichten lesen kann

  1. Führen Sie auf der Instanz pulltop für das öffentliche Thema aus:

    pulltop projects/pubsub-public-data/topics/taxirides-realtime
    

    Wenn pulltop funktioniert, wird ein Ergebnis-Stream wie der folgende angezeigt:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_stat
    us":"enroute","passenger_count":1}
  2. Drücken Sie Ctrl+C, um den Stream zu stoppen.

Nachrichtenstream zu websocketd aufbauen

Nachdem Sie sich vergewissert haben, dass pulltop das Pub/Sub-Thema korrekt liest, können Sie den websocketd-Prozess starten, um Nachrichten an den Browser zu senden.

Nachrichten des Themas in einer lokalen Datei erfassen

In dieser Anleitung erfassen Sie den Nachrichtenstream von pulltop und schreiben ihn in eine lokale Datei. Für das Erfassen von Nachrichtenverkehr in einer lokalen Datei benötigen Sie zusätzlichen Speicher, andererseits wird dadurch der websocketd-Prozess vom Streaming der Pub/Sub-Nachrichten abgekoppelt. Die lokale Erfassung der Daten kann nützlich sein, wenn Sie das Pub/Sub-Streaming vorübergehend anhalten möchten (z. B. zum Einstellen von Parametern der Ablaufsteuerung), ohne ein Reset der aktuell verbundenen WebSocket-Clients erzwingen zu müssen. Nachdem der Nachrichtenstream wiederhergestellt ist, setzt websocketd das Streaming zu den Clients automatisch fort.

  1. Führen Sie auf der Instanz pulltop für das öffentliche Thema aus und leiten Sie die Nachrichtenausgabe in die lokale Datei taxi.json weiter. Mit dem Befehl nohup wird das Betriebssystem angewiesen, den pulltop-Prozess weiter auszuführen, wenn Sie sich abmelden oder das Terminal schließen.

    nohup pulltop \
      projects/pubsub-public-data/topics/taxirides-realtime > \
      /var/tmp/taxi.json &
    
  2. Prüfen Sie, ob in die Datei JSON-Nachrichten geschrieben werden:

    tail /var/tmp/taxi.json
    

    Wenn die Nachrichten in die Datei taxi.json geschrieben werden, sieht die Ausgabe etwa so aus:

    {"ride_id":"9729a68d-fcde-484b-bc32-bf29f5188628","point_idx":328,"latitude"
    :40.757360000000006,"longitude":-73.98228,"timestamp":"2019-03-22T20:03:51.6
    593-04:00","meter_reading":11.069151,"meter_increment":0.033747412,"ride_sta
    tus":"enroute","passenger_count":1}
  3. Wechseln Sie in den Webordner Ihrer Anwendung:

    cd ../web
    
  4. Starten Sie websocketd, um den Inhalt der lokalen Datei mit WebSockets zu streamen:

    nohup websocketd --port=8000 --staticdir=. tail -f /var/tmp/taxi.json &
    

    Damit wird der Befehl websocketd im Hintergrund ausgeführt. Das Tool websocketd erfasst die Ausgabe des Befehls tail und streamt die einzelnen Elemente als WebSocket-Nachrichten.

  5. Prüfen Sie den Inhalt von nohup.out, um zu ermitteln, ob der Server korrekt gestartet wurde:

    tail nohup.out
    

    Wenn alles korrekt funktioniert, sieht die Ausgabe etwa so aus:

    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving using application   : /usr/bin/tail -f /var/tmp/taxi.json
    Mon, 25 Mar 2019 14:03:53 -0400 | INFO   | server     |  | Serving static content from : .
    

Nachrichten visualisieren

Nachrichten mit Daten individueller Fahrten, die im Pub/Sub-Thema veröffentlicht werden, haben eine Struktur wie diese:

{
  "ride_id": "562127d7-acc4-4af9-8fdd-4eedd92b6e69",
  "point_idx": 248,
  "latitude": 40.74644000000001,
  "longitude": -73.97144,
  "timestamp": "2019-03-24T00:46:08.49094-04:00",
  "meter_reading": 8.40615,
  "meter_increment": 0.033895764,
  "ride_status": "enroute",
  "passenger_count": 1
}

Anhand dieser Werte können Sie verschiedene Messwerte für den Dashboard-Header berechnen. Die Berechnungen werden bei jedem eingehenden Fahrtereignis vorgenommen. Die Werte sind folgende:

  • Latenz der letzten Nachricht: Anzahl der Sekunden, die zwischen dem Zeitstempel des letzten beobachteten Fahrtereignisses und der aktuellen Zeit verstrichen sind (erfasst mit der Uhr des Systems, auf dem der Webbrowser gehostet wird).
  • Aktive Fahrten: Anzahl der aktuell laufenden Fahrten. Diese Zahl kann schnell steigen und sie nimmt ab, wenn der ride_status-Wert dropoff erfasst wird.
  • Nachrichtendurchsatz: Durchschnittliche Anzahl der pro Sekunde verarbeiteten Fahrtereignisse.
  • Gesamtstrecke: Summe der bei allen Fahrten zurückgelegten Meter. Diese Zahl nimmt ab, wenn Fahrten abgebrochen werden.
  • Fahrgäste gesamt: Anzahl der Fahrgäste aller Fahrten. Diese Zahl nimmt zu, wenn Fahrten abgeschlossen werden.
  • Durchschnittliche Fahrgastzahl pro Fahrt: Gesamtzahl der Fahrten, dividiert durch die Anzahl der Fahrgäste.
  • Durchschnittliche Strecke pro Fahrgast: Gesamtstrecke, dividiert durch die Anzahl der Fahrgäste.

Zusätzlich zu den Messwerten und den Fahrtbeispielen wird im Dashboard über dem Raster der Fahrtbeispiele eine Meldung eingeblendet, wenn ein Fahrgast ein- oder aussteigt.

  1. Rufen Sie die externe IP-Adresse der aktuellen Instanz ab:

    curl -H "Metadata-Flavor: Google" http://metadata/computeMetadata/v1/instance/network-interfaces/0/access-configs/0/external-ip; echo
    
    
  2. Kopieren Sie die IP-Adresse.

  3. Öffnen Sie auf Ihrem lokalen Computer einen Webbrowser und geben Sie diese URL ein:

    http://$ip-address:8000

    Sie sehen eine Seite mit dem Dashboard, das Sie mit dieser Anleitung erstellt haben:

    Dashboard, das mit dem Code aus dieser Anleitung erstellt wurde, mit der Anzeige einer Willkommensnachricht, bevor Daten angezeigt werden.

  4. Klicken Sie oben auf das Taxisymbol, um die Verbindung zum Stream herzustellen und mit der Verarbeitung der Nachrichten zu beginnen.

    Einzelne Fahrten werden als Auszug von jeweils neun aktiven Fahrten alle drei Sekunden angezeigt:

    Dashboard mit der Darstellung aktiver Fahrten.

    Sie können jederzeit auf das Taxisymbol klicken, um den WebSocket-Stream zu starten oder zu stoppen. Wird die WebSocket-Verbindung getrennt, wird das Symbol rot und die Aktualisierung der einzelnen Fahrten und Messwerte wird unterbrochen. Klicken Sie noch einmal auf das Taxisymbol, um die Verbindung wiederherzustellen.

Leistung

Der nachstehende Screenshot zeigt den Leistungsmonitor der Chrome-Entwicklertools, während im Browser-Tab rund 2.100 Nachrichten pro Sekunde verarbeitet werden.

Im Bereich der Browserleistung werden die CPU-Auslastung, die Größe des Heap-Speichers, die DOM-Knoten und die Stil-Neuberechnungen pro Sekunde angezeigt. Die Werte sind relativ niedrig.

Die Nachrichtenweiterleitung erfolgt mit einer Latenz von rund 30 ms und die CPU-Auslastung beträgt rund 80 %. Die Speicherauslastung hat einen Mindestwert von 29 MB bei einer Gesamtzuweisung von 57 MB und schwankt frei.

Bereinigen

Firewallregeln entfernen

Wenn Sie für diese Anleitung ein vorhandenes Projekt verwendet haben, können Sie die von Ihnen erstellten Firewallregeln entfernen. Offene Ports sollten immer möglichst gering gehalten werden.

  1. Löschen Sie die von Ihnen erstellte Firewallregel, um TCP auf Port 8000 zuzulassen:

    gcloud compute firewall-rules delete websocket
    
  2. Wenn Sie auch eine Firewallregel erstellt haben, die SSH-Verbindungen zulässt, löschen Sie die Firewallregel, um TCP auf Port 22 zuzulassen:

    gcloud compute firewall-rules delete wss-ssh
    

Projekt löschen

Wenn Sie dieses Projekt nicht mehr verwenden möchten, können Sie es löschen.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

Nächste Schritte