Bigtable Spark-Connector verwenden
Mit dem Bigtable Spark-Connector können Sie Daten in Bigtable lesen und schreiben. Sie können Daten aus Ihrer Spark-Anwendung mit Spark SQL und DataFrames lesen. Die folgenden Bigtable-Vorgänge werden mit dem Bigtable Spark-Connector unterstützt:
- Daten schreiben
- Daten lesen
- Neue Tabelle erstellen
In diesem Dokument wird beschrieben, wie Sie eine Spark SQL-DataFrames-Tabelle in eine Bigtable-Tabelle konvertieren und dann eine JAR-Datei kompilieren und erstellen, um einen Spark-Job zu übergeben.
Supportstatus für Spark und Scala
Der Bigtable Spark-Connector unterstützt die folgenden Scala-Versionen:
Der Bigtable Spark-Connector unterstützt die folgenden Spark-Versionen:
Der Bigtable Spark-Connector unterstützt die folgenden Dataproc-Versionen:
- Cluster mit Image-Version 1.5
- Cluster mit Image-Version 2.0
- 2.1 Cluster für die Image-Version
- 2.2 Cluster für Image-Version
- Dataproc Serverless-Laufzeitversion 1.0
Kosten berechnen
Wenn Sie eine der folgenden kostenpflichtigen Komponenten von Google Cloudverwenden, werden Ihnen die von Ihnen genutzten Ressourcen in Rechnung gestellt:
- Bigtable (die Verwendung des Bigtable-Emulators ist kostenlos)
- Dataproc
- Cloud Storage
Die Dataproc-Preise gelten für die Verwendung von Dataproc in Compute Engine-Clustern. Die Preise für Dataproc Serverless gelten für Arbeitslasten und Sitzungen, die in Dataproc Serverless für Spark ausgeführt werden.
Mit dem Preisrechner können Sie eine Kostenschätzung für Ihre voraussichtliche Nutzung vornehmen.
Hinweise
Erfüllen Sie die folgenden Voraussetzungen, bevor Sie den Bigtable Spark-Connector verwenden.
Erforderliche Rollen
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zur Verwendung des Bigtable Spark-Connectors benötigen:
-
Bigtable-Administrator (
roles/bigtable.admin
)(optional): Ermöglicht das Lesen oder Schreiben von Daten und das Erstellen einer neuen Tabelle. -
Bigtable-Nutzer (
roles/bigtable.user
): Mit dieser Rolle können Sie Daten lesen oder schreiben, aber keine neue Tabelle erstellen.
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.
Wenn Sie Dataproc oder Cloud Storage verwenden, sind möglicherweise zusätzliche Berechtigungen erforderlich. Weitere Informationen finden Sie unter Dataproc-Berechtigungen und Cloud Storage-Berechtigungen.
Spark einrichten
Neben dem Erstellen einer Bigtable-Instanz müssen Sie auch Ihre Spark-Instanz einrichten. Sie können dies lokal tun oder eine der folgenden Optionen auswählen, um Spark mit Dataproc zu verwenden:
- Dataproc-Cluster
- Dataproc Serverless
Weitere Informationen zur Auswahl zwischen einem Dataproc-Cluster und einer serverlosen Option finden Sie in der Dokumentation Dataproc Serverless für Spark im Vergleich zu Dataproc in Compute Engine .
Connector-JAR-Datei herunterladen
Den Quellcode des Bigtable Spark-Connectors mit Beispielen finden Sie im GitHub-Repository des Bigtable Spark-Connectors.
Je nach Spark-Einrichtung können Sie so auf die JAR-Datei zugreifen:
Wenn Sie PySpark lokal ausführen, sollten Sie die JAR-Datei des Connectors vom Cloud Storage-Speicherort
gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
herunterladen.Ersetzen Sie
SCALA_VERSION
durch2.12
oder2.13
, die einzigen unterstützten Scala-Versionen, und ersetzen SieCONNECTOR_VERSION
durch die Connector-Version, die Sie verwenden möchten.Verwenden Sie für Dataproc-Cluster oder die serverlose Option die neueste JAR-Datei als Artefakt, das in Ihre Scala- oder Java-Spark-Anwendungen eingefügt werden kann. Weitere Informationen zur Verwendung der JAR-Datei als Artefakt finden Sie unter Abhängigkeiten verwalten.
Wenn Sie Ihren PySpark-Job an Dataproc senden, verwenden Sie das
gcloud dataproc jobs submit pyspark --jars
-Flag, um den URI für den Speicherort der JAR-Datei in Cloud Storage festzulegen, z. B.gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
.
Compute-Typ ermitteln
Für schreibgeschützte Jobs können Sie serverloses Computing von Data Boost verwenden, um die Cluster, die Ihre Anwendungen bereitstellen, nicht zu beeinträchtigen. Ihre Spark-Anwendung muss Version 1.1.0 oder höher des Spark-Connectors verwenden, um Data Boost nutzen zu können.
Wenn Sie Data Boost verwenden möchten, müssen Sie ein Data Boost-Anwendungsprofil erstellen und dann die Anwendungsprofil-ID für die spark.bigtable.app_profile.id
-Spark-Option angeben, wenn Sie Ihrer Spark-Anwendung die Bigtable-Konfiguration hinzufügen. Wenn Sie bereits ein Anwendungsprofil für Ihre Spark-Lesejobs erstellt haben und es weiterhin verwenden möchten, ohne den Anwendungscode zu ändern, können Sie das Anwendungsprofil in ein Data Boost-Anwendungsprofil umwandeln. Weitere Informationen finden Sie unter App-Profil umwandeln.
Weitere Informationen finden Sie in der Übersicht über Bigtable Data Boost.
Bei Jobs, die Lese- und Schreibvorgänge umfassen, können Sie die Clusterknoten Ihrer Instanz für das Computing verwenden, indem Sie ein Standardanwendungsprofil in Ihrer Anfrage angeben.
Anwendungsprofil ermitteln oder erstellen
Wenn Sie keine ID für das Anwendungsprofil angeben, verwendet der Connector das Standard-Anwendungsprofil.
Wir empfehlen, für jede ausgeführte Anwendung, einschließlich Ihrer Spark-Anwendung, ein eigenes App-Profil zu verwenden. Weitere Informationen zu Anwendungsprofiltypen und ‑einstellungen finden Sie unter Anwendungsprofile – Übersicht. Eine Anleitung finden Sie unter Anwendungsprofile erstellen und konfigurieren.
Bigtable-Konfiguration zu Ihrer Spark-Anwendung hinzufügen
Fügen Sie Ihrer Spark-Anwendung die Spark-Optionen hinzu, mit denen Sie mit Bigtable interagieren können.
Unterstützte Spark-Optionen
Verwenden Sie die Spark-Optionen, die im com.google.cloud.spark.bigtable
-Paket verfügbar sind.
Optionsname | Erforderlich | Standardwert | Bedeutung |
---|---|---|---|
spark.bigtable.project.id |
Ja | – | Legen Sie die Bigtable-Projekt-ID fest. |
spark.bigtable.instance.id |
Ja | – | Legen Sie die Bigtable-Instanz-ID fest. |
catalog |
Ja | – | Legen Sie das JSON-Format fest, das das Konvertierungsformat zwischen dem SQL-ähnlichen Schema des DataFrames und dem Schema der Bigtable-Tabelle angibt. Weitere Informationen finden Sie unter Tabellenmetadaten im JSON-Format erstellen. |
spark.bigtable.app_profile.id |
Nein | default |
Bigtable-App-Profil-ID festlegen. |
spark.bigtable.write.timestamp.milliseconds |
Nein | Aktuelle Systemzeit | Legen Sie den Zeitstempel in Millisekunden fest, der beim Schreiben eines DataFrame in Bigtable verwendet werden soll. Da alle Zeilen im DataFrame denselben Zeitstempel verwenden, werden Zeilen mit derselben Spalte für den Zeilenschlüssel im DataFrame als eine einzelne Version in Bigtable beibehalten, da sie denselben Zeitstempel haben. |
spark.bigtable.create.new.table |
Nein | false |
Auf true festgelegt, um vor dem Schreiben in Bigtable eine neue Tabelle zu erstellen. |
spark.bigtable.read.timerange.start.milliseconds oder spark.bigtable.read.timerange.end.milliseconds |
Nein | – | Legen Sie Zeitstempel (in Millisekunden seit der Epochenzeit) fest, um Zellen mit einem bestimmten Start- und Enddatum zu filtern. |
spark.bigtable.push.down.row.key.filters |
Nein | true |
Auf true setzen, um serverseitiges Filtern nach einfachem Zeilenschlüssel zu ermöglichen. Das Filtern nach zusammengesetzten Zeilenschlüsseln wird clientseitig implementiert.Weitere Informationen finden Sie unter Eine bestimmte DataFrame-Zeile mit einem Filter lesen. |
spark.bigtable.read.rows.attempt.timeout.milliseconds |
Nein | 30 m | Legen Sie die timeout-Dauer für einen Versuch zum Lesen von Zeilen fest, der einer DataFrame-Partition im Bigtable-Client für Java entspricht. |
spark.bigtable.read.rows.total.timeout.milliseconds |
Nein | 12 Std. | Legen Sie die Gesamtdauer für das Zeitlimit für einen Versuch zum Lesen von Zeilen fest, der einer DataFrame-Partition im Bigtable-Client für Java entspricht. |
spark.bigtable.mutate.rows.attempt.timeout.milliseconds |
Nein | 1m | Legen Sie die timeout-Dauer für einen Versuch zum Ändern von Zeilen fest, der einer DataFrame-Partition im Bigtable-Client für Java entspricht. |
spark.bigtable.mutate.rows.total.timeout.milliseconds |
Nein | 10 Min. | Legen Sie die Dauer für das Gesamt--Zeitlimit für einen Versuch zum Ändern von Zeilen fest, der einer DataFrame-Partition im Bigtable-Client für Java entspricht. |
spark.bigtable.batch.mutate.size |
Nein | 100 |
Wird auf die Anzahl der Mutationen in jedem Batch festgelegt. Der Höchstwert, den Sie festlegen können, ist 100000 . |
spark.bigtable.enable.batch_mutate.flow_control |
Nein | false |
Setzen Sie den Wert auf true , um die Ablaufsteuerung für Batchmutationen zu aktivieren. |
Tabellen-Metadaten im JSON-Format erstellen
Das Spark SQL DataFrames-Tabellenformat muss mithilfe eines Strings im JSON-Format in eine Bigtable-Tabelle konvertiert werden. Durch dieses String-JSON-Format ist das Datenformat mit Bigtable kompatibel. Sie können das JSON-Format in Ihrem Anwendungscode mit der Option .option("catalog", catalog_json_string)
übergeben.
Sehen Sie sich als Beispiel die folgende DataFrame-Tabelle und die entsprechende Bigtable-Tabelle an.
In diesem Beispiel werden die Spalten name
und birthYear
im DataFrame unter der Spaltenfamilie info
gruppiert und in name
bzw. birth_year
umbenannt. Die Spalte address
wird in der Spaltenfamilie location
mit demselben Spaltennamen gespeichert. Die Spalte id
aus dem DataFrame wird in den Bigtable-Zeilenschlüssel konvertiert.
Die Zeilenschlüssel haben in Bigtable keinen eigenen Spaltennamen. In diesem Beispiel wird id_rowkey
nur verwendet, um dem Connector mitzuteilen, dass es sich um die Spalte mit dem Zeilenschlüssel handelt. Sie können einen beliebigen Namen für die Spalte mit dem Zeilenschlüssel verwenden. Achten Sie jedoch darauf, dass Sie denselben Namen verwenden, wenn Sie das Feld "rowkey":"column_name"
im JSON-Format deklarieren.
DataFrame | Bigtable-Tabelle = t1 | |||||||
Spalten | Zeilenschlüssel | Spaltenfamilien | ||||||
Info | Standort | |||||||
Spalten | Spalten | |||||||
id | name | birthYear | Adresse | id_rowkey | name | birth_year | Adresse |
Das JSON-Format für den Katalog sieht so aus:
"""
{
"table": {"name": "t1"},
"rowkey": "id_rowkey",
"columns": {
"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
"name": {"cf": "info", "col": "name", "type": "string"},
"birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
"address": {"cf": "location", "col": "address", "type": "string"}
}
}
"""
Die im JSON-Format verwendeten Schlüssel und Werte sind:
Katalogsicherheitsschlüssel | Katalogwert | JSON-Format |
---|---|---|
Tabelle | Name der Bigtable-Tabelle. | "table":{"name":"t1"} Wenn die Tabelle nicht vorhanden ist, verwenden Sie .option("spark.bigtable.create.new.table", "true") , um eine Tabelle zu erstellen. |
rowkey | Name der Spalte, die als Bigtable-Zeilenschlüssel verwendet wird. Achten Sie darauf, dass der Spaltenname der DataFrame-Spalte als Zeilenschlüssel verwendet wird, z. B. id_rowkey . Zusammengesetzte Schlüssel werden auch als Zeilenschlüssel akzeptiert. Beispiel: "rowkey":"name:address" Bei diesem Ansatz kann es zu Zeilenschlüsseln kommen, für die bei allen Leseanfragen ein vollständiger Tabellenscan erforderlich ist. |
"rowkey":"id_rowkey" , |
Spalten | Zuordnung jeder DataFrame-Spalte zur entsprechenden Bigtable-Spaltenfamilie ("cf" ) und zum entsprechenden Spaltennamen ("col" ). Der Spaltenname kann sich vom Spaltennamen in der DataFrame-Tabelle unterscheiden. Unterstützte Datentypen sind string , long und binary . |
"columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}" In diesem Beispiel ist id_rowkey der Zeilenschlüssel und info und location sind die Spaltenfamilien. |
Unterstützte Datentypen
Der Connector unterstützt die Verwendung der Typen string
, long
und binary
(Byte-Array) im Katalog. Bis die Unterstützung für andere Typen wie int
und float
hinzugefügt wird, können Sie solche Datentypen manuell in Byte-Arrays (BinaryType
von Spark SQL) konvertieren, bevor Sie den Connector verwenden, um sie in Bigtable zu schreiben.
Außerdem können Sie Avro zum Serialisieren komplexer Typen wie ArrayType
verwenden. Weitere Informationen finden Sie unter Komplexe Datentypen mit Apache Avro serialisieren.
In Bigtable schreiben
Verwenden Sie die Funktion .write()
und die unterstützten Optionen, um Ihre Daten in Bigtable zu schreiben.
Java
Im folgenden Code aus dem GitHub-Repository wird Java und Maven verwendet, um in Bigtable zu schreiben.
String catalog = "{" +
"\"table\":{\"name\":\"" + tableName + "\"," +
"\"tableCoder\":\"PrimitiveType\"}," +
"\"rowkey\":\"wordCol\"," +
"\"columns\":{" +
"\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
"\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
"}}".replaceAll("\\s+", "");
…
private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
String createNewTable) {
dataframe
.write()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.option("spark.bigtable.create.new.table", createNewTable)
.save();
}
Python
Im folgenden Code aus dem GitHub-Repository wird Python verwendet, um in Bigtable zu schreiben.
catalog = ''.join(("""{
"table":{"name":" """ + bigtable_table_name + """
", "tableCoder":"PrimitiveType"},
"rowkey":"wordCol",
"columns":{
"word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
"count":{"cf":"example_family", "col":"countCol", "type":"long"}
}
}""").split())
…
input_data = spark.createDataFrame(data)
print('Created the DataFrame:')
input_data.show()
input_data.write \
.format('bigtable') \
.options(catalog=catalog) \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.option('spark.bigtable.create.new.table', create_new_table) \
.save()
print('DataFrame was written to Bigtable.')
…
Aus Bigtable lesen
Mit der Funktion .read()
können Sie prüfen, ob die Tabelle erfolgreich in Bigtable importiert wurde.
Java
…
private static Dataset<Row> readDataframeFromBigtable(String catalog) {
Dataset<Row> dataframe = spark
.read()
.format("bigtable")
.option("catalog", catalog)
.option("spark.bigtable.project.id", projectId)
.option("spark.bigtable.instance.id", instanceId)
.load();
return dataframe;
}
Python
…
records = spark.read \
.format('bigtable') \
.option('spark.bigtable.project.id', bigtable_project_id) \
.option('spark.bigtable.instance.id', bigtable_instance_id) \
.options(catalog=catalog) \
.load()
print('Reading the DataFrame from Bigtable:')
records.show()
Projekt kompilieren
Generieren Sie die JAR-Datei, die zum Ausführen eines Jobs in einem Dataproc-Cluster, in Dataproc Serverless oder in einer lokalen Spark-Instanz verwendet wird. Sie können die JAR-Datei lokal kompilieren und dann damit einen Job einreichen. Der Pfad zum kompilierten JAR wird beim Einreichen eines Jobs als Umgebungsvariable PATH_TO_COMPILED_JAR
festgelegt.
Dieser Schritt gilt nicht für PySpark-Anwendungen.
Abhängigkeiten verwalten
Der Bigtable Spark-Connector unterstützt die folgenden Tools zur Verwaltung von Abhängigkeiten:
JAR-Datei kompilieren
Maven
Fügen Sie die
spark-bigtable
-Abhängigkeit in Ihre pom.xml-Datei ein.<dependencies> <dependency> <groupId>com.google.cloud.spark.bigtable</groupId> <artifactId>spark-bigtable_SCALA_VERSION</artifactId> <version>0.1.0</version> </dependency> </dependencies>
Fügen Sie das Maven-Shade-Plug-in in Ihre
pom.xml
-Datei ein, um eine Uber-JAR-Datei zu erstellen:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> </plugin> </plugins>
Führen Sie den Befehl
mvn clean install
aus, um eine JAR-Datei zu generieren.
sbt
Fügen Sie der Datei
build.sbt
die Abhängigkeitspark-bigtable
hinzu:libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
Fügen Sie das
sbt-assembly
-Plug-in Ihrerproject/plugins.sbt
- oderproject/assembly.sbt
-Datei hinzu, um eine Uber-JAR-Datei zu erstellen.addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
Führen Sie den Befehl
sbt clean assembly
aus, um die JAR-Datei zu generieren.
Gradle
Fügen Sie der Datei
build.gradle
die Abhängigkeitspark-bigtable
hinzu.dependencies { implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0' }
Fügen Sie das Shadow-Plug-in in der Datei
build.gradle
hinzu, um eine Uber-JAR-Datei zu erstellen:plugins { id 'com.github.johnrengelman.shadow' version '8.1.1' id 'java' }
Weitere Informationen zur Konfiguration und zum Kompilieren von JAR-Dateien finden Sie in der Dokumentation zum Shadow-Plug-in.
Job senden
Senden Sie einen Spark-Job mit Dataproc, Dataproc Serverless oder einer lokalen Spark-Instanz, um Ihre Anwendung zu starten.
Laufzeitumgebung festlegen
Legen Sie die folgenden Umgebungsvariablen fest:
#Google Cloud
export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE
#Dataproc Serverless
export BIGTABLE_SPARK_SUBNET=SUBNET
export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME
#Scala/Java
export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR
#PySpark
export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR
Ersetzen Sie Folgendes:
- PROJECT_ID: Die permanente Kennzeichnung des Bigtable-Projekts.
- INSTANCE_ID: Die permanente Kennzeichnung der Bigtable-Instanz.
- TABLE_NAME: Die permanente Kennzeichnung der Tabelle.
- DATAPROC_CLUSTER: Die permanente Kennzeichnung des Dataproc-Clusters.
- DATAPROC_REGION: Die Dataproc-Region, die einen der Cluster in Ihrer Dataproc-Instanz enthält, z. B.
northamerica-northeast2
. - DATAPROC_ZONE: Die Zone, in der der Dataproc-Cluster ausgeführt wird.
- SUBNET: Der vollständige Ressourcenpfad des Subnetzes.
- GCS_BUCKET_NAME: Der Cloud Storage-Bucket zum Hochladen von Spark-Workload-Abhängigkeiten.
- PATH_TO_COMPILED_JAR: Der vollständige oder relative Pfad zur kompilierten JAR-Datei, z. B.
/path/to/project/root/target/<compiled_JAR_name>
für Maven. - GCS_PATH_TO_CONNECTOR_JAR: Der
gs://spark-lib/bigtable
Cloud Storage-Bucket, in dem sich die Dateispark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar
befindet. - PATH_TO_PYTHON_FILE: Bei PySpark-Anwendungen der Pfad zur Python-Datei, die zum Schreiben von Daten in Bigtable und zum Lesen von Daten aus Bigtable verwendet wird.
- LOCAL_PATH_TO_CONNECTOR_JAR: Für PySpark-Anwendungen der Pfad zur heruntergeladenen JAR-Datei des Bigtable Spark-Connectors.
Spark-Job senden
Führen Sie für Dataproc-Instanzen oder Ihre lokale Spark-Einrichtung einen Spark-Job aus, um Daten in Bigtable hochzuladen.
Dataproc-Cluster
Verwenden Sie die kompilierte JAR-Datei und erstellen Sie einen Dataproc-Clusterjob, der Daten aus Bigtable liest und in Bigtable schreibt.
Erstellen Sie einen Dataproc-Cluster. Im folgenden Beispiel wird ein Beispielbefehl zum Erstellen eines Dataproc 2.0-Clusters mit Debian 10, zwei Worker-Knoten und Standardkonfigurationen gezeigt.
gcloud dataproc clusters create \ $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \ --zone $BIGTABLE_SPARK_DATAPROC_ZONE \ --master-machine-type n2-standard-4 --master-boot-disk-size 500 \ --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \ --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
Senden Sie einen Job.
Scala/Java
Im folgenden Beispiel wird die Klasse
spark.bigtable.example.WordCount
gezeigt, die die Logik zum Erstellen einer Testtabelle in DataFrame, zum Schreiben der Tabelle in Bigtable und zum Zählen der Anzahl der Wörter in der Tabelle enthält.gcloud dataproc jobs submit spark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --class=spark.bigtable.example.WordCount \ --jar=$PATH_TO_COMPILED_JAR \ -- \ $BIGTABLE_SPARK_PROJECT_ID \ $BIGTABLE_SPARK_INSTANCE_ID \ $BIGTABLE_SPARK_TABLE_NAME \
PySpark
gcloud dataproc jobs submit pyspark \ --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \ --region=$BIGTABLE_SPARK_DATAPROC_REGION \ --jars=$GCS_PATH_TO_CONNECTOR_JAR \ --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \ $PATH_TO_PYTHON_FILE \ -- \ --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \ --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \ --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
Dataproc Serverless
Verwenden Sie die kompilierte JAR-Datei und erstellen Sie einen Dataproc-Job, der Daten mit einer serverlosen Dataproc-Instanz in Bigtable liest und schreibt.
Scala/Java
gcloud dataproc batches submit spark \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
-- \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
--region=$BIGTABLE_SPARK_DATAPROC_REGION \
--subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
--deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
--jars=$GCS_PATH_TO_CONNECTOR_JAR \
--properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
-- \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Lokale Spark-Aufgabe
Verwenden Sie die heruntergeladene JAR-Datei und erstellen Sie einen Spark-Job, der Daten mit einer lokalen Spark-Instanz aus Bigtable liest und in Bigtable schreibt. Sie können den Spark-Job auch mit dem Bigtable-Emulator senden.
Bigtable-Emulator verwenden
Wenn Sie den Bigtable-Emulator verwenden möchten, gehen Sie so vor:
Verwenden Sie den folgenden Befehl, um den Emulator zu starten:
gcloud beta emulators bigtable start
Standardmäßig wählt der Emulator
localhost:8086
aus.Legen Sie die Umgebungsvariable
BIGTABLE_EMULATOR_HOST
fest:export BIGTABLE_EMULATOR_HOST=localhost:8086
Weitere Informationen zur Verwendung des Bigtable-Emulators finden Sie unter Mit dem Emulator testen.
Spark-Job senden
Verwenden Sie den Befehl spark-submit
, um einen Spark-Job zu senden, unabhängig davon, ob Sie einen lokalen Bigtable-Emulator verwenden.
Scala/Java
spark-submit $PATH_TO_COMPILED_JAR \
$BIGTABLE_SPARK_PROJECT_ID \
$BIGTABLE_SPARK_INSTANCE_ID \
$BIGTABLE_SPARK_TABLE_NAME
PySpark
spark-submit \
--jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
--packages=org.slf4j:slf4j-reload4j:1.7.36 \
$PATH_TO_PYTHON_FILE \
--bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
--bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
--bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME
Tabellendaten prüfen
Führen Sie den folgenden cbt
-Befehlszeilenbefehl aus, um zu prüfen, ob die Daten in Bigtable geschrieben wurden. Die cbt
CLI ist eine Komponente der Google Cloud CLI. Weitere Informationen finden Sie in der
Übersicht über das cbt
-CLI.
cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
read $BIGTABLE_SPARK_TABLE_NAME
Zusätzliche Lösungen
Verwenden Sie den Bigtable Spark-Connector für bestimmte Lösungen, z. B. zum Serialisieren komplexer Spark SQL-Typen, zum Lesen bestimmter Zeilen und zum Generieren von clientseitigen Messwerten.
Bestimmte DataFrame-Zeile mit einem Filter lesen
Wenn Sie DataFrames zum Lesen aus Bigtable verwenden, können Sie einen Filter angeben, um nur bestimmte Zeilen zu lesen. Einfache Filter wie ==
, <=
und startsWith
für die Zeilenschlüsselspalte werden serverseitig angewendet, um einen vollständigen Tabellenscan zu vermeiden. Filter für zusammengesetzte Zeilenschlüssel oder komplexe Filter wie der LIKE
-Filter für die Spalte mit dem Zeilenschlüssel werden auf der Clientseite angewendet.
Wenn Sie große Tabellen lesen, empfehlen wir, einfache Zeilenschlüsselfilter zu verwenden, um einen vollständigen Tabellenscan zu vermeiden. Die folgende Beispielanweisung zeigt, wie mit einem einfachen Filter gelesen wird. Achten Sie darauf, dass Sie in Ihrem Spark-Filter den Namen der DataFrame-Spalte verwenden, die in den Zeilenschlüssel konvertiert wird:
dataframe.filter("id == 'some_id'").show()
Verwenden Sie beim Anwenden eines Filters den DataFrame-Spaltennamen anstelle des Bigtable-Tabellenspaltennamens.
Komplexe Datentypen mit Apache Avro serialisieren
Der Bigtable Spark-Connector unterstützt die Verwendung von Apache Avro zum Serialisieren komplexer Spark SQL-Typen wie ArrayType
, MapType
oder StructType
. Apache Avro bietet Datenserialisierung für Datensätze, die häufig zum Verarbeiten und Speichern komplexer Datenstrukturen verwendet werden.
Verwenden Sie eine Syntax wie "avro":"avroSchema"
, um anzugeben, dass eine Spalte in Bigtable mit Avro codiert werden soll. Sie können .option("avroSchema", avroSchemaString)
dann beim Lesen aus oder Schreiben in Bigtable verwenden, um das Avro-Schema für diese Spalte im Stringformat anzugeben. Sie können verschiedene Optionsnamen verwenden, z. B. "anotherAvroSchema"
für verschiedene Spalten, und Avro-Schemas für mehrere Spalten übergeben.
def catalogWithAvroColumn = s"""{
|"table":{"name":"ExampleAvroTable"},
|"rowkey":"key",
|"columns":{
|"col0":{"cf":"rowkey", "col":"key", "type":"string"},
|"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
|}
|}""".stripMargin
Clientseitige Messwerte verwenden
Da der Bigtable Spark-Connector auf dem Bigtable-Client für Java basiert, sind clientseitige Messwerte im Connector standardmäßig aktiviert. Weitere Informationen zum Zugriff auf diese Messwerte und zur Interpretation finden Sie in der Dokumentation zu clientseitigen Messwerten.
Bigtable-Client für Java mit RDD-Funktionen auf niedriger Ebene verwenden
Da der Bigtable Spark-Connector auf dem Bigtable-Client für Java basiert, können Sie den Client direkt in Ihren Spark-Anwendungen verwenden und verteilte Lese- oder Schreibanfragen in den RDD-Funktionen auf niedriger Ebene wie mapPartitions
und foreachPartition
ausführen.
Wenn Sie die Bigtable-Clientklassen für Java verwenden möchten, hängen Sie das Präfix com.google.cloud.spark.bigtable.repackaged
an die Paketnamen an. Verwenden Sie beispielsweise com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient
anstelle von com.google.cloud.bigtable.data.v2.BigtableDataClient
.
Weitere Informationen zum Bigtable-Client für Java finden Sie unter Bigtable-Client für Java.
Nächste Schritte
- Informationen zum Optimieren von Spark-Jobs in Dataproc
- Verwenden Sie Klassen aus dem Bigtable-Client für Java mit dem Bigtable Spark-Connector.