Auf dieser Seite erfahren Sie, wie Sie mit dem Spark Spanner Connector Daten mit Apache Spark aus Spanner lesen.
Kosten berechnen
In diesem Dokument verwenden Sie die folgenden kostenpflichtigen Komponenten von Google Cloud:
- Dataproc
- Spanner
- Cloud Storage
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Hinweise
Bevor Sie mit der Anleitung beginnen, müssen Sie die Connector-Version kennen und einen Connector-URI abrufen.
URI der JAR-Datei des Connectors angeben
Spark Spanner-Connector-Versionen sind im GitHub-Repository GoogleCloudDataproc/spark-spanner-connector aufgeführt.
Geben Sie die JAR-Datei des Connectors an, indem Sie die Informationen zur Connectorversion im folgenden URI-String ersetzen:
gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
Der Connector ist für Spark-Versionen 3.1+
Beispiel für die gcloud CLI:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/spanner/spark-3.1-spanner-1.0.0.jar \ -- job-args
Spanner-Datenbank vorbereiten
Wenn Sie noch keine Spanner-Tabelle haben, können Sie dieser Anleitung folgen, um eine zu erstellen. Danach haben Sie eine Instanz-ID, eine Datenbank-ID und eine Tabelle Singers
.
Dataproc-Cluster erstellen
Jeder Dataproc-Cluster, der den Connector verwendet, benötigt die Bereiche spanner
oder cloud-platform
. Dataproc-Cluster haben den Standardbereich cloud-platform
für Image 2.1 oder höher. Wenn Sie eine ältere Version verwenden, können Sie mit der Google Cloud Console, der Google Cloud CLI und der Dataproc API einen Dataproc-Cluster erstellen.
Console
- Öffnen Sie in der Google Cloud Console die Dataproc-Seite Cluster erstellen.
- Klicken Sie auf dem Tab „Sicherheit verwalten“ im Bereich „Projektzugriff“ auf „Aktiviert den Bereich ‚cloud-platform‘ für diesen Cluster“.
- Füllen Sie die restlichen Felder für die Clustererstellung aus oder bestätigen Sie sie und klicken Sie dann auf „Erstellen“.
Google Cloud CLI
gcloud dataproc clusters create CLUSTER_NAME --scopes https://www.googleapis.com/auth/cloud-platform
API
Sie können GceClusterConfig.serviceAccountScopes als Teil einer clusters.create-Anfrage angeben. Beispiel:"serviceAccountScopes": ["https://www.googleapis.com/auth/cloud-platform"],
Achten Sie darauf, dass dem Dataproc-VM-Dienstkonto die entsprechende Spanner-Berechtigung zugewiesen ist. Wenn Sie in der Anleitung Data Boost verwenden, lesen Sie den Hilfeartikel IAM-Berechtigung für Data Boost.
Daten aus Spanner lesen
Mit Scala und Python können Sie mithilfe der Spark-Datenquellen-API Daten aus Spanner in einen Spark-DataFrame lesen.
Scala
- Prüfen Sie den Code und ersetzen Sie die Platzhalter [projectId], [instanceId], [databaseId] und [table] durch die Projekt-ID, die Instanz-ID, die Datenbank-ID und die Tabelle, die Sie zuvor erstellt haben. Mit der Option „enableDataBoost“ wird die Data Boost-Funktion von Spanner aktiviert, die nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz hat.
object singers { def main(): Unit = { /* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-spanner-demo") .getOrCreate() */ // Load data in from Spanner. See // https://github.com/GoogleCloudDataproc/spark-spanner-connector/blob/main/README.md#properties // for option information. val singersDF = (spark.read.format("cloud-spanner") .option("projectId", "[projectId]") .option("instanceId", "[instanceId]") .option("databaseId", "[databaseId]") .option("enableDataBoost", true) .option("table", "[table]") .load() .cache()) singersDF.createOrReplaceTempView("Singers") // Load the Singers table. val result = spark.sql("SELECT * FROM Singers") result.show() result.printSchema() } }
- Code in einem Cluster ausführen
- SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
- Rufen Sie in der Google Cloud Console die Seite Dataproc-Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Wählen Sie auf der Seite > Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf
SSH
.
Ein Browserfenster wird in Ihrem Basisverzeichnis auf dem Masterknoten geöffnet.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Erstellen Sie
singers.scala
mit dem vorinstallierten Texteditorvi
,vim
odernano
und fügen Sie dann den Scala-Code aus der Scala-Code-Liste ein.nano singers.scala
- Starten Sie die
spark-shell
-REPL.$ spark-shell --jars=gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar
- Führen Sie singers.scala mit dem Befehl
:load singers.scala
aus, um die Spanner-Singers
-Tabelle zu erstellen. Die Ausgabeliste zeigt Beispiele aus der Ausgabe von „Sänger“ an.> :load singers.scala Loading singers.scala... defined object singers > singers.main() ... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true)
PySpark
- Prüfen Sie den Code und ersetzen Sie die Platzhalter [projectId], [instanceId], [databaseId] und [table] durch die Projekt-ID, die Instanz-ID, die Datenbank-ID und die Tabelle, die Sie zuvor erstellt haben. Mit der Option „enableDataBoost“ wird die Data Boost-Funktion von Spanner aktiviert, die nahezu keine Auswirkungen auf die Haupt-Spanner-Instanz hat.
#!/usr/bin/env python """Spanner PySpark read example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-spanner-demo') \ .getOrCreate() # Load data from Spanner. singers = spark.read.format('cloud-spanner') \ .option("projectId", "[projectId]") \ .option("instanceId", "[instanceId]") \ .option("databaseId", "[databaseId]") \ .option("enableDataBoost", "true") \ .option("table", "[table]") \ .load() singers.createOrReplaceTempView('Singers') # Read from Singers result = spark.sql('SELECT * FROM Singers') result.show() result.printSchema()
- Code in Ihrem Cluster ausführen
- SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
- Rufen Sie in der Google Cloud Console die Seite Dataproc-Cluster auf und klicken Sie auf den Namen Ihres Clusters.
- Wählen Sie auf der Seite Clusterdetails den Tab „VM-Instanzen“ aus. Klicken Sie dann rechts neben dem Namen des Cluster-Masterknotens auf
SSH
.
Ein Browserfenster wird in Ihrem Basisverzeichnis auf dem primären Knoten geöffnet.Connected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Erstellen Sie
singers.py
mit dem vorinstallierten Texteditorvi
,vim
odernano
und fügen Sie dann den PySpark-Code aus der PySpark-Codeliste ein.nano singers.py
- Führen Sie singers.py mit
spark-submit
aus, um die Spanner-Singers
-Tabelle zu erstellen. Die Ausgabe sieht so aus:spark-submit --jars gs://spark-lib/spanner/spark-3.1-spanner-CONNECTOR_VERSION.jar singers.py
... +--------+---------+--------+---------+-----------+ |SingerId|FirstName|LastName|BirthDate|LastUpdated| +--------+---------+--------+---------+-----------+ | 1| Marc|Richards| null| null| | 2| Catalina| Smith| null| null| | 3| Alice| Trentor| null| null| +--------+---------+--------+---------+-----------+ root |-- SingerId: long (nullable = false) |-- FirstName: string (nullable = true) |-- LastName: string (nullable = true) |-- BirthDate: date (nullable = true) |-- LastUpdated: timestamp (nullable = true) only showing top 20 rows
- SSH-Verbindung zum Masterknoten des Dataproc-Clusters herstellen
Bereinigen
Mit den folgenden Schritten vermeiden Sie, dass Ihrem Google Cloud-Konto die in dieser Schritt-für-Schritt-Anleitung erstellten Ressourcen laufend in Rechnung gestellt werden:
gcloud dataproc clusters stop CLUSTER_NAME gcloud dataproc clusters delete CLUSTER_NAME