Änderungen mit Dataflow streamen

Mit dem Bigtable-Beam-Connector können Sie Dataflow verwenden, um Bigtable-Datenänderungseinträge zu lesen, ohne Partitionsänderungen in Ihrem Code verfolgen oder verarbeiten zu müssen, da der Connector diese Logik für Sie übernimmt.

In diesem Dokument wird beschrieben, wie Sie den Bigtable Beam-Connector konfigurieren und verwenden, um einen Änderungsstream mit einer Dataflow-Pipeline zu lesen. Bevor Sie dieses Dokument lesen, sollten Sie sich die Übersicht über Änderungsstreams ansehen und mit Dataflow vertraut sein.

Alternativen zum Erstellen einer eigenen Pipeline

Wenn Sie keine eigene Dataflow-Pipeline erstellen möchten, können Sie eine der folgenden Optionen verwenden.

Sie können eine von Google bereitgestellte Dataflow-Vorlage verwenden.

Sie können auch die Codebeispiele aus dem Bigtable-Tutorial oder der Bigtable-Kurzanleitung als Ausgangspunkt für Ihren Code verwenden.

Achten Sie darauf, dass der generierte Code google cloud libraries-bom-Version 26.14.0 oder höher verwendet.

Connector-Details

Mit der Bigtable Beam-Connector-Methode BigtableIO.readChangeStream können Sie einen Stream von Datensatzänderungen (ChangeStreamMutation) lesen, die Sie verarbeiten können. Der Bigtable Beam Connector ist eine Komponente des Apache Beam GitHub-Repositorys. Eine Beschreibung des Connector-Codes finden Sie in den Kommentaren unter BigtableIO.java.

Sie müssen den Connector mit Beam-Version 2.48.0 oder höher verwenden. Prüfen Sie unter Unterstützung von Apache Beam-Laufzeiten, ob Sie eine unterstützte Version von Java verwenden. Anschließend können Sie eine Pipeline bereitstellen, die den Connector für Dataflow verwendet. Dataflow übernimmt die Bereitstellung und Verwaltung von Ressourcen und unterstützt die Skalierbarkeit und Zuverlässigkeit der Verarbeitung von Streamingdaten.

Weitere Informationen zum Apache Beam-Programmiermodell finden Sie in der Beam-Dokumentation.

Daten ohne Ereigniszeiten gruppieren

Mit dem Bigtable-Beam-Connector gestreamte Datensatzänderungseinträge sind nicht mit Dataflow-Funktionen kompatibel, die von Ereigniszeiten abhängen.

Wie unter Replikation und Watermarks beschrieben, wird ein niedriges Watermark möglicherweise nicht aktualisiert, wenn die Replikation für die Partition nicht mit dem Rest der Instanz Schritt gehalten hat. Wenn ein niedriges Wasserzeichen nicht mehr weiterläuft, kann dies dazu führen, dass der Änderungsstream ins Stocken gerät.

Damit der Stream nicht ins Stocken gerät, gibt der Bigtable-Beam-Connector alle Daten mit einem Ausgabetimestamp von null aus. Durch den Zeitstempel „0“ betrachtet Dataflow alle Datensatzänderungen als späte Daten. Daher sind Dataflow-Funktionen, die von Ereigniszeiten abhängen, nicht mit Bigtable-Änderungsstreams kompatibel. Insbesondere können Sie keine Fensterfunktionen, Ereigniszeit-Trigger oder Ereigniszeit-Timer verwenden.

Stattdessen können Sie GlobalWindows mit Triggern verwenden, die nicht auf der Ereigniszeit basieren, um diese verspäteten Daten in Bereiche zu gruppieren. Ein Beispiel dafür finden Sie in der Anleitung. Weitere Informationen zu Triggern und Bereichen finden Sie in der Beam-Programmieranleitung unter „Trigger“.

Autoscaling

Der Connector unterstützt Dataflow-Autoscaling, das standardmäßig aktiviert ist, wenn Runner v2 (erforderlich) verwendet wird. Der Dataflow-Autoscaling-Algorithmus berücksichtigt den geschätzten Rückstand des Änderungsstreams, der im Bereich Backlog auf der Seite Dataflow-Monitoring überwacht werden kann. Verwenden Sie das Flag --maxNumWorkers, wenn Sie einen Job bereitstellen, um die Anzahl der Worker zu begrenzen.

Wenn Sie Ihre Pipeline manuell skalieren möchten, anstatt Autoscaling zu verwenden, lesen Sie den Abschnitt Streamingpipeline manuell skalieren.

Beschränkungen

Beachten Sie die folgenden Einschränkungen, bevor Sie den Bigtable Beam-Connector mit Dataflow verwenden.

Dataflow Runner V2

Der Connector kann nur mit Dataflow Runner v2 ausgeführt werden. Geben Sie --experiments=use_runner_v2 in den Befehlszeilenargumenten an, um diese Funktion zu aktivieren. Wenn Sie Runner v1 verwenden, schlägt Ihre Pipeline mit der folgenden Ausnahme fehl:

java.lang.UnsupportedOperationException: BundleFinalizer unsupported by non-portable Dataflow

Snapshots

Der Connector unterstützt keine Dataflow-Snapshots.

Duplikate

Der Bigtable-Beam-Connector streamt Änderungen für jeden Zeilenschlüssel und jeden Cluster in der Reihenfolge der Commit-Zeitstempel. Da er jedoch manchmal von früheren Zeitpunkten im Stream neu startet, kann er Duplikate erzeugen.

Neustarts von Pipelines

Wenn eine Dataflow-Pipeline längere Zeit nicht ausgeführt wurde, können Datensatzänderungen hinter die Aufbewahrungsgrenze fallen. Wenn die Pipeline fortgesetzt wird, schlägt sie in Bigtable fehl, damit Sie eine neue Pipeline mit einer neuen Startzeit für die Anfrage starten können, die innerhalb des Aufbewahrungszeitraums liegt. Bigtable tut dies, anstatt die Anfragezeit der ursprünglichen Pipeline im Hintergrund voranzutreiben, um zu verhindern, dass Datensatzänderungen mit Zeitstempeln, die außerhalb des angegebenen Aufbewahrungszeitraums liegen, unbeabsichtigt gelöscht werden.

Hinweise

Bevor Sie den Connector verwenden, müssen Sie die folgenden Voraussetzungen erfüllen.

Authentifizierung einrichten

Wenn Sie die Java -Beispiele auf dieser Seite in einer lokalen Entwicklungsumgebung verwenden möchten, installieren und initialisieren Sie die gcloud CLI und richten Sie dann die Standardanmeldedaten für Anwendungen mit Ihren Nutzeranmeldedaten ein.

    Installieren Sie die Google Cloud CLI.

    Wenn Sie einen externen Identitätsanbieter (IdP) verwenden, müssen Sie sich zuerst mit Ihrer föderierten Identität in der gcloud CLI anmelden.

    If you're using a local shell, then create local authentication credentials for your user account:

    gcloud auth application-default login

    You don't need to do this if you're using Cloud Shell.

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

Weitere Informationen finden Sie unter Set up authentication for a local development environment.

Informationen zum Einrichten der Authentifizierung für eine Produktionsumgebung finden Sie unter Set up Application Default Credentials for code running on Google Cloud.

Änderungsstream aktivieren

Sie müssen einen Änderungsstream für eine Tabelle aktivieren, bevor Sie sie lesen können. Sie können auch eine neue Tabelle mit aktivierten Änderungsstreams erstellen.

Stream-Metadatentabelle ändern

Wenn Sie Änderungen mit Dataflow streamen, erstellt der Bigtable Beam-Connector standardmäßig eine Metadatentabelle mit dem Namen __change_stream_md_table. In der Metadatentabelle für Änderungsstreams wird der Betriebsstatus des Connectors verwaltet und Metadaten zu Datensatzänderungen gespeichert.

Standardmäßig wird die Tabelle vom Connector in derselben Instanz wie die Tabelle erstellt, die gestreamt wird. Damit die Tabelle richtig funktioniert, muss das Anwendungsprofil für die Metadatentabelle Single-Cluster-Routing verwenden und Transaktionen für einzelne Zeilen aktiviert haben.

Weitere Informationen zum Streamen von Änderungen aus Bigtable mit dem Bigtable Beam-Connector finden Sie in der BigtableIO-Dokumentation.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Lesen eines Bigtable-Änderungsstreams mit Dataflow benötigen.

Zum Lesen der Änderungen aus Bigtable benötigen Sie diese Rolle:

  • Bigtable-Administrator (roles/bigtable.admin) für die Bigtable-Instanz, die die Tabelle enthält, aus der Sie Änderungen streamen möchten

Zum Ausführen des Dataflow-Jobs benötigen Sie die folgenden Rollen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Bigtable Beam-Connector als Abhängigkeit hinzufügen

Fügen Sie Ihrer Maven-Datei „pom.xml“ Code hinzu, der der folgenden Abhängigkeit ähnelt. Die Version muss 2.48.0 oder höher sein.

<dependencies>
  <dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
    <version>VERSION</version>
  </dependency>
</dependencies>

Änderungsstream lesen

Wenn Sie eine Dataflow-Pipeline zum Lesen Ihrer Datensatzänderungen erstellen möchten, konfigurieren Sie den Connector und fügen dann Transformationen und Senken hinzu. Anschließend verwenden Sie den Connector, um ChangeStreamMutation-Objekte in einer Beam-Pipeline zu lesen.

Die Codebeispiele in diesem Abschnitt sind in Java geschrieben und zeigen, wie Sie eine Pipeline erstellen und damit Schlüssel/Wert-Paare in einen String umwandeln. Jedes Paar besteht aus einem Zeilenschlüssel und einem ChangeStreamMutation-Objekt. In der Pipeline werden die Einträge jedes Objekts in einen durch Kommas getrennten String umgewandelt.

Pipeline erstellen

Dieses Java-Codebeispiel zeigt, wie die Pipeline erstellt wird:

BigtableOptions options =
    PipelineOptionsFactory.fromArgs(args).withValidation().as(BigtableOptions.class);
Pipeline p = Pipeline.create(options);

final Instant startTime = Instant.now();

p.apply(
        "Read Change Stream",
        BigtableIO.readChangeStream()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId())
            .withAppProfileId(options.getBigtableAppProfile())
            .withStartTime(startTime))
    .apply(
        "Flatten Mutation Entries",
        FlatMapElements.into(TypeDescriptors.strings())
            .via(ChangeStreamsHelloWorld::mutationEntriesToString))
    .apply(
        "Print mutations",
        ParDo.of(
            new DoFn<String, Void>() { // a DoFn as an anonymous inner class instance
              @ProcessElement
              public void processElement(@Element String mutation) {
                System.out.println("Change captured: " + mutation);
              }
            }));
p.run();

Datensatzänderungen verarbeiten

In diesem Beispiel wird gezeigt, wie Sie alle Einträge in einem Datensatz für Datenänderungen für eine Zeile durchlaufen und eine Methode zum Konvertieren in einen String basierend auf dem Eintragstyp aufrufen.

Eine Liste der Eintragstypen, die ein Datensatz für Datenänderungen enthalten kann, finden Sie unter Inhalt eines Datensatzes für Datenänderungen.

static List<String> mutationEntriesToString(KV<ByteString, ChangeStreamMutation> mutationPair) {
  List<String> mutations = new ArrayList<>();
  String rowKey = mutationPair.getKey().toStringUtf8();
  ChangeStreamMutation mutation = mutationPair.getValue();
  MutationType mutationType = mutation.getType();
  for (Entry entry : mutation.getEntries()) {
    if (entry instanceof SetCell) {
      mutations.add(setCellToString(rowKey, mutationType, (SetCell) entry));
    } else if (entry instanceof DeleteCells) {
      mutations.add(deleteCellsToString(rowKey, mutationType, (DeleteCells) entry));
    } else if (entry instanceof DeleteFamily) {
      // Note: DeleteRow mutations are mapped into one DeleteFamily per-family
      mutations.add(deleteFamilyToString(rowKey, mutationType, (DeleteFamily) entry));
    } else {
      throw new RuntimeException("Entry type not supported.");
    }
  }
  return mutations;
}

In diesem Beispiel wird ein write-Eintrag konvertiert:

private static String setCellToString(String rowKey, MutationType mutationType, SetCell setCell) {
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "SetCell",
          setCell.getFamilyName(),
          setCell.getQualifier().toStringUtf8(),
          setCell.getValue().toStringUtf8());
  return String.join(",", mutationParts);
}

In diesem Beispiel wird ein Eintrag vom Typ deletion of cells (Löschen von Zellen) konvertiert:

private static String deleteCellsToString(
    String rowKey, MutationType mutationType, DeleteCells deleteCells) {
  String timestampRange =
      deleteCells.getTimestampRange().getStart() + "-" + deleteCells.getTimestampRange().getEnd();
  List<String> mutationParts =
      Arrays.asList(
          rowKey,
          mutationType.name(),
          "DeleteCells",
          deleteCells.getFamilyName(),
          deleteCells.getQualifier().toStringUtf8(),
          timestampRange);
  return String.join(",", mutationParts);
}

In diesem Beispiel wird ein Eintrag für das Löschen einer Spaltenfamilie konvertiert:


private static String deleteFamilyToString(
    String rowKey, MutationType mutationType, DeleteFamily deleteFamily) {
  List<String> mutationParts =
      Arrays.asList(rowKey, mutationType.name(), "DeleteFamily", deleteFamily.getFamilyName());
  return String.join(",", mutationParts);
}

Überwachen

Mit den folgenden Ressourcen in der Google Cloud -Konsole können Sie IhreGoogle Cloud -Ressourcen überwachen, während Sie eine Dataflow-Pipeline zum Lesen eines Bigtable-Änderungsstreams ausführen:

Prüfen Sie insbesondere die folgenden Messwerte:

  • Prüfen Sie auf der Seite „Bigtable-Systemstatistiken“ die folgenden Messwerte:
    • CPU-Auslastung nach Änderungsstreams-Daten im Messwert cpu_load_by_app_profile_by_method_by_table. Hier sehen Sie, wie sich der Änderungsstream auf die CPU-Auslastung Ihres Clusters auswirkt.
    • Änderungsstream-Speicherauslastung (Byte) (change_stream_log_used_bytes).
  • Prüfen Sie auf der Seite „Dataflow Monitoring“ die Datenaktualität. Dieser Messwert gibt die Differenz zwischen der aktuellen Zeit und dem Wasserzeichen an. Sie beträgt in der Regel etwa zwei Minuten, mit gelegentlichen Spitzen, die ein oder zwei Minuten länger sind. Die Datenaktualität gibt nicht an, ob Datensatzänderungen langsam verarbeitet werden. Um den kontinuierlichen Zustand und die Leistung Ihrer kritischen Anwendungen zu gewährleisten, sollten Sie den Messwert „Dataflow-Datenaktualität“ im Blick behalten und die folgenden Maßnahmen ergreifen:

    • Wenn der Messwert für die Datenaktualität konstant höher als der Schwellenwert ist, sind für Ihre Pipeline möglicherweise nicht genügend Ressourcen vorhanden. Wir empfehlen, weitere Dataflow-Worker hinzuzufügen.
    • Wenn die Dataflow-Worker gut bereitgestellt sind, die Datenaktualität jedoch zugenommen hat oder konstant hoch ist, wenden Sie sich an den Google Cloud Support.
  • Mit dem Dataflow-Messwert processing_delay_from_commit_timestamp_MEAN können Sie die durchschnittliche Verarbeitungszeit von Datensatzänderungen über die gesamte Lebensdauer des Jobs hinweg ermitteln.

Die Bigtable-Messwert server/latencies ist nicht hilfreich, wenn Sie eine Dataflow-Pipeline überwachen, die einen Bigtable-Änderungsstream liest, da er die Dauer der Streaminganfrage und nicht die Latenz bei der Verarbeitung von Datenänderungsdatensätzen widerspiegelt. Eine hohe Latenz in einem Änderungsstream bedeutet nicht, dass die Anfragen langsam verarbeitet werden, sondern dass die Verbindung so lange geöffnet war.

Nächste Schritte