Auf dieser Seite wird beschrieben, wie Sie den Dataflow-Connector für Spanner verwenden, um Daten in Spanner-Datenbanken mit GoogleSQL-Dialekt und PostgreSQL-Dialekt zu importieren, zu exportieren und zu ändern.
Dataflow ist ein verwalteter Dienst für die Transformation und Anreicherung von Daten. Mit dem Dataflow-Connector für Spanner können Sie über eine Dataflow-Pipeline Daten in Spanner lesen und schreiben. Optional können Sie die Daten dabei transformieren oder ändern. Sie können auch Pipelines erstellen, mit denen Daten zwischen Spanner und anderenGoogle Cloud -Produkten übertragen werden.
Der Dataflow-Connector ist die empfohlene Methode zur effizienten Bulk-Verschiebung von Daten in und aus Spanner. Sie ist auch die empfohlene Methode für die Durchführung großer Transformationen an einer Datenbank, die von partitionierter DML nicht unterstützt werden, z. B. Tabellenverschiebungen und Bulk-Löschvorgänge, für die ein JOIN erforderlich ist. Wenn Sie mit einzelnen Datenbanken arbeiten, können Sie die Daten mit anderen Methoden importieren und exportieren:
- Verwenden Sie die Google Cloud Console, um eine einzelne Datenbank im Avro aus Spanner nach Cloud Storage zu exportieren.
- Verwenden Sie die Google Cloud Console, um eine Datenbank aus Dateien, die Sie nach Cloud Storage exportiert haben, zurück in Spanner zu importieren.
- Verwenden Sie die REST API oder die Google Cloud CLI, um Export- oder Importjobs von Spanner nach Cloud Storage und umgekehrt auszuführen (ebenfalls mit dem Avro-Format).
Der Dataflow-Connector für Spanner ist Teil des Apache Beam Java SDK und bietet eine API für die oben genannten Vorgänge. Weitere Informationen zu einigen der auf dieser Seite erläuterten Konzepte wie PCollection
-Objekte und -Transformationen finden Sie in der Apache Beam-Programmieranleitung.
Connector dem Maven-Projekt hinzufügen
Wenn Sie den Google Cloud Dataflow-Connector einem Maven-Projekt hinzufügen möchten, dann fügen Sie das Maven-Artefakt beam-sdks-java-io-google-cloud-platform
als Abhängigkeit in die Datei pom.xml
ein:
Angenommen, Ihre Datei pom.xml
setzt die Version von beam.version
auf die entsprechende Versionsnummer. In diesem Fall fügen Sie die folgende Abhängigkeit hinzu:
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
Daten aus Spanner lesen
Um aus Spanner zu lesen, wenden Sie die Transformation SpannerIO.read
an. Konfigurieren Sie den Lesevorgang mit den Methoden aus der Klasse SpannerIO.Read
. Durch Einsatz der Transformation wird das Objekt PCollection<Struct>
zurückgegeben. Dabei stellt jedes Element in der Sammlung eine einzelne Zeile dar, die vom Lesevorgang zurückgegeben wird. Je nach gewünschter Ausgabe können Sie Daten mit einer bestimmten SQL-Abfrage oder ohne SQL-Abfrage aus Cloud Spanner lesen.
Durch Einsatz der Transformation SpannerIO.read
wird mithilfe eines starken Lesevorgangs eine konsistente Datenansicht zurückgegeben. Sofern Sie dies nicht anders festlegen, beruht das Ergebnis des Lesevorgangs auf einem Snapshot der Daten zu dem Zeitpunkt, an dem Sie den Lesevorgang gestartet haben. Weitere Informationen zu den verschiedenen Arten von Lesevorgängen, die mit Spanner möglich sind, finden Sie unter Lesevorgänge.
Daten mithilfe einer Abfrage lesen
Wenn Sie bestimmte Daten aus Spanner lesen möchten, konfigurieren Sie die Transformation mit der Methode SpannerIO.Read.withQuery
, um eine SQL-Abfrage anzugeben. Beispiel:
Daten lesen, ohne eine Abfrage anzugeben
Wenn Sie ohne eine Abfrage aus einer Datenbank lesen möchten, können Sie mit der Methode SpannerIO.Read.withTable
einen Tabellennamen und mit der Methode SpannerIO.Read.withColumns
eine Liste der zu lesenden Spalten angeben. Beispiel:
GoogleSQL
PostgreSQL
Wenn Sie die Anzahl der gelesenen Zeilen begrenzen möchten, können Sie mit der Methode SpannerIO.Read.withKeySet
eine Reihe von Primärschlüsseln angeben, die gelesen werden sollen.
Sie können eine Tabelle auch mit einem angegebenen sekundären Index lesen. Wie beim readUsingIndex
-API-Aufruf muss der Index alle Daten enthalten, die in den Abfrageergebnissen angezeigt werden.
Geben Sie dazu die Tabelle wie im vorherigen Beispiel an und geben Sie den Index an, der die benötigten Spaltenwerte enthält, indem Sie die Methode SpannerIO.Read.withIndex
verwenden. Im Index müssen alle Spalten gespeichert werden, die von der Transformation gelesen werden müssen. Der Primärschlüssel der Basistabelle wird implizit gespeichert. Wenn Sie beispielsweise die Tabelle Songs
mit dem Index SongsBySongName
lesen möchten, verwenden Sie den folgenden Code:
GoogleSQL
PostgreSQL
Veralterung von Transaktionsdaten steuern
Eine Transformation wird immer auf einen konsistenten Daten-Snapshot angewendet. Verwenden Sie die Methode SpannerIO.Read.withTimestampBound
, um die Veralterung der Daten zu steuern. Weitere Informationen finden Sie unter Transaktionen.
Lesen aus mehreren Tabellen mit der gleichen Transaktion
Wenn Sie Daten aus mehreren Tabellen zur gleichen Zeit lesen möchten, damit Sie so die Datenkonsistenz gewährleisten können, führen Sie alle Lesevorgänge mit einer einzigen Transaktion aus. Verwenden Sie dazu die Transformation createTransaction
und erstellen Sie ein Objekt PCollectionView<Transaction>
, mit dem wiederum eine Transaktion erstellt wird. Die entstehende Ansicht kann mit SpannerIO.Read.withTransaction
an einen Lesevorgang übergeben werden.
GoogleSQL
PostgreSQL
Daten aus allen verfügbaren Tabellen lesen
Sie können Daten aus allen verfügbaren Tabellen in einer Spanner-Datenbank lesen.
GoogleSQL
PostgreSQL
Fehlerbehebung bei nicht unterstützten Abfragen
Der Dataflow-Connector unterstützt nur Spanner-SQL-Abfragen, bei denen der erste Operator im Abfrageausführungsplan Distributed Union ist. Wenn Sie versuchen, Daten aus Spanner mit einer Abfrage zu lesen und die Ausnahmemeldung does not have a
DistributedUnion at the root
erhalten, führen Sie die unter Informationen zur Ausführung von Abfragen in Cloud Spanner angegebenen Schritte aus, damit Sie mit der Google Cloud Console einen Ausführungsplan für die Abfrage abrufen können.
Wenn die SQL-Abfrage nicht unterstützt wird, vereinfachen Sie sie entsprechend, damit sie als ersten Operator im Abfrageausführungsplan den Operator "Distributed Union" enthält. Entfernen Sie Aggregatfunktionen, Tabellen-Joins sowie die Operatoren DISTINCT
, GROUP BY
und ORDER
, da diese Operatoren am ehesten verhindern, dass die Abfrage funktioniert.
Mutationen für einen Schreibvorgang erstellen
Verwenden Sie die Methode newInsertOrUpdateBuilder
der Klasse Mutation
anstelle der Methode newInsertBuilder
, sofern diese nicht unbedingt für Java-Pipelines erforderlich ist. Verwenden Sie für Python-Pipelines SpannerInsertOrUpdate
anstelle von SpannerInsert
. Dataflow garantiert die mindestens einmalige Ausführung. Dies bedeutet, dass die Mutation möglicherweise mehrmals geschrieben wird. Daher führen nur INSERT
-Mutationen möglicherweise zu com.google.cloud.spanner.SpannerException: ALREADY_EXISTS
-Fehlern, die einen Ausfall der Pipeline verursachen. Verwenden Sie stattdessen die Mutation INSERT_OR_UPDATE
, um diesen Fehler zu vermeiden. Damit wird eine neue Zeile hinzugefügt oder Spaltenwerte aktualisiert, falls die Zeile bereits vorhanden ist. Die Mutation INSERT_OR_UPDATE
kann mehrmals angewendet werden.
In Spanner schreiben und Daten transformieren
Mit dem Dataflow-Connector können Sie Daten in Spanner schreiben. Verwenden Sie hierfür die Transformation SpannerIO.write
, um eine Sammlung von Eingabezeilenmutationen auszuführen. Für eine höhere Effizienz gruppiert der Dataflow-Connector Mutationen in Batches.
Das folgende Beispiel zeigt, wie eine Schreibtransformation auf eine PCollection
von Mutationen angewendet wird:
GoogleSQL
PostgreSQL
Wenn eine Transformation vor dem Abschluss unerwartet abgebrochen wird, werden die bereits ausgeführten Mutationen nicht zurückgesetzt.
Gruppen von Mutationen untrennbar ausführen
Sie können mit der Klasse MutationGroup
eine Gruppe von Mutationen untrennbar zusammen ausführen. Mutationen in einer MutationGroup
werden garantiert in der gleichen Transaktion übertragen, aber die Transaktion wird möglicherweise wiederholt.
Mutationsgruppen funktionieren am besten, wenn sie zur Zusammenfassung von Mutationen verwendet werden, die Auswirkungen auf im Schlüsselbereich nahe beieinander gespeicherte Daten haben. Da Spanner Daten aus übergeordneten und untergeordneten Tabellen in der übergeordneten Tabelle miteinander verschachtelt, befinden sich diese Daten im Schlüsselbereich immer nahe beieinander. Wir empfehlen, die Mutationsgruppe entweder so zu strukturieren, dass sie eine auf eine übergeordnete Tabelle angewendete Mutation sowie zusätzliche auf untergeordnete Tabellen angewendete Mutationen enthält oder dass sämtliche Mutationen im Schlüsselbereich nahe beieinander liegende Daten ändern. Weitere Informationen dazu, wie Spanner Daten von über- und untergeordneten Tabellen speichert, finden Sie unter Schema und Datenmodell. Wenn Sie die Mutationsgruppen nicht anhand der empfohlenen Tabellenhierarchien strukturieren oder wenn sich die Daten, auf die zugegriffen wird, im Schlüsselbereich nicht nahe beieinander befinden, muss Spanner möglicherweise Zwei-Phasen-Commits durchführen. Das führt zu einer geringeren Leistung. Weitere Informationen finden Sie unter Kompromisse bei der Lokalität.
Wenn Sie eine MutationGroup
verwenden möchten, erstellen Sie eine SpannerIO.write
-Transformation und rufen Sie die Methode SpannerIO.Write.grouped
auf. Diese gibt eine Transformation zurück, die Sie dann auf eine PCollection
von Objekten des Typs MutationGroup
anwenden können.
Beim Erstellen einer MutationGroup
wird die erste aufgelistete Mutation zur primären Mutation. Wenn Ihre Mutationsgruppe sowohl eine übergeordnete als auch eine untergeordnete Tabelle betrifft, sollte die primäre Mutation eine Mutation der übergeordneten Tabelle sein. Andernfalls können Sie jede Mutation als primäre Mutation verwenden. Der Cloud Dataflow-Connector verwendet die primäre Mutation, um Partitionsgrenzen zu bestimmen, damit Mutationen effizient zusammengefasst werden können.
Stellen Sie sich beispielsweise vor, dass Ihre Anwendung das Verhalten der Nutzer überwacht und problematische Fälle kennzeichnet, damit diese überprüft werden. Für jeden gekennzeichneten Fall des Nutzerverhaltens möchten Sie die Tabelle Users
aktualisieren, damit der Nutzer nicht mehr auf Ihre Anwendung zugreifen kann. Außerdem wollen Sie den Vorfall in der Tabelle PendingReviews
aufzeichnen. Verwenden Sie eine MutationGroup
, um dafür zu sorgen, dass beide Tabellen untrennbar voneinander aktualisiert werden:
GoogleSQL
PostgreSQL
Beim Erstellen einer Mutationsgruppe wird die erste Mutation, die als Argument angegeben ist, zur primären Mutation. In diesem Fall haben die beiden Tabellen keinen Bezug zueinander, also gibt es keine eindeutige primäre Mutation. Wir haben uns für userMutation
als primäre Mutation entschieden und haben sie an die erste Stelle gesetzt. Beide Mutationen separat auszuführen wäre zwar schneller, würde aber die Untrennbarkeit nicht garantieren, weshalb die Mutationsgruppe in dieser Situation die beste Wahl darstellt.
Weitere Informationen
- Apache Beam-Datenpipeline entwerfen
- Exportieren und Importieren von Spanner-Datenbanken in derGoogle Cloud console mit Dataflow