BigQuery-Metastore mit Dataproc verwenden
In diesem Dokument wird beschrieben, wie Sie den BigQuery-Metastore mit Dataproc in der Compute Engine verwenden. Diese Verbindung bietet einen einzigen freigegebenen Metastore, der für Open-Source-Software-Engines wie Apache Spark oder Apache Flink funktioniert.
Hinweise
- Aktivieren Sie die Abrechnung für Ihr Google Cloud -Projekt. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist
Aktivieren Sie die BigQuery- und Dataproc-APIs.
Optional: Informationen zur Funktionsweise des BigQuery-Metaspeichers und dazu, warum Sie ihn verwenden sollten.
Erforderliche Rollen
Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Verwenden von Spark oder Flink und Dataproc mit dem BigQuery-Metastore als Metadatenspeicher benötigen:
-
Dataproc-Cluster erstellen:
Dataproc-Worker (
roles/dataproc.worker
) im Standarddienstkonto der Compute Engine im Projekt -
BigQuery-Metastore-Tabellen in Spark oder Flink erstellen:
-
Dataproc-Worker (
roles/dataproc.worker
) für das Dataproc-VM-Dienstkonto im Projekt -
BigQuery-Datenbearbeiter (
roles/bigquery.dataEditor
) für das Dataproc-VM-Dienstkonto im Projekt -
Storage-Objekt-Administrator (
roles/storage.objectAdmin
) für das Dataproc-VM-Dienstkonto im Projekt
-
Dataproc-Worker (
-
BigQuery-Metastore-Tabellen in BigQuery abfragen:
-
BigQuery Data Viewer (
roles/bigquery.dataViewer
) für das Projekt -
BigQuery-Nutzer (
roles/bigquery.user
) für das Projekt -
Storage-Objekt-Betrachter (
roles/storage.objectViewer
) für das Projekt
-
BigQuery Data Viewer (
Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.
Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.
Allgemeiner Workflow
So verwenden Sie Dataproc in der Compute Engine mit dem BigQuery-Metastore:
- Erstellen Sie einen Dataproc-Cluster oder konfigurieren Sie einen vorhandenen Cluster.
- Stellen Sie eine Verbindung zu Ihrer bevorzugten Open-Source-Software-Engine wie Spark oder Flink her.
- Verwenden Sie eine JAR-Datei, um das Apache Iceberg-Katalog-Plug-in auf dem Cluster zu installieren.
- Erstellen und verwalten Sie Ihre BigQuery-Metastore-Ressourcen nach Bedarf, je nachdem, welche Open-Source-Software-Engine Sie verwenden.
- In BigQuery auf Ihre BigQuery Metastore-Ressourcen zugreifen und sie verwenden.
BigQuery-Metastore mit Spark verbinden
In der folgenden Anleitung wird beschrieben, wie Sie Dataproc mit dem BigQuery-Metastore über interaktive Spark SQL-Abfragen verbinden.
Iceberg-Katalog-Plug-in herunterladen
Wenn Sie den BigQuery-Metastore mit Dataproc und Spark verbinden möchten, müssen Sie die JAR-Datei des BigQuery-Metastore-Iceberg-Katalog-Plug-ins verwenden.
Diese Datei ist standardmäßig in der Dataproc-Image-Version 2.2 enthalten. Wenn Ihre Dataproc-Cluster keinen direkten Zugriff auf das Internet haben, müssen Sie das Plug-in herunterladen und in einen Cloud Storage-Bucket hochladen, auf den Ihr Dataproc-Cluster zugreifen kann.
Iceberg-Katalog-Plug-in für den BigQuery-Metastore herunterladen
Dataproc-Cluster konfigurieren
Bevor Sie eine Verbindung zum BigQuery-Metastore herstellen können, müssen Sie einen Dataproc-Cluster einrichten.
Dazu können Sie einen neuen Cluster erstellen oder einen vorhandenen verwenden. Anschließend können Sie diesen Cluster verwenden, um interaktive Spark SQL-Abfragen auszuführen und Ihre BigQuery-Metastore-Ressourcen zu verwalten.
Für das Subnetz in der Region, in der der Cluster erstellt wird, muss der private Google-Zugriff (Private Google Access, PGA) aktiviert sein. Standardmäßig haben Dataproc-Cluster-VMs, die mit der Image-Version 2.2 (Standard) oder höher erstellt wurden, nur interne IP-Adressen. Damit Cluster-VMs mit Google APIs kommunizieren können, müssen Sie den privaten Google-Zugriff im Netzwerksubnetz
default
(oder dem vom Nutzer angegebenen Netzwerknamen, falls zutreffend) in der Region aktivieren, in der der Cluster erstellt wird.Wenn Sie das Beispiel für die Zeppelin-Weboberfläche in dieser Anleitung ausführen möchten, müssen Sie einen Dataproc-Cluster mit aktivierter optionaler Zeppelin-Komponente verwenden oder erstellen.
Neuer Cluster
Führen Sie den folgenden gcloud
dataproc clusters create
-Befehl aus, um einen neuen Dataproc-Cluster zu erstellen. Diese Konfiguration enthält die Einstellungen, die Sie für die Verwendung des BigQuery-Metastores benötigen.
gcloud dataproc clusters create CLUSTER_NAME \ --project=PROJECT_ID \ --region=LOCATION \ --optional-components=ZEPPELIN \ --enable-component-gateway \ --single-node
Ersetzen Sie Folgendes:
CLUSTER_NAME
: ein Name für Ihren Dataproc-Cluster.PROJECT_ID
: die ID des Google Cloud Projekts, in dem Sie den Cluster erstellen.LOCATION
: die Google Cloud Region, in der Sie den Cluster erstellen.
Vorhandener Cluster
Wenn Sie einen vorhandenen Cluster konfigurieren möchten, fügen Sie ihm die folgende Iceberg-Spark-Laufzeit hinzu.
org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1
Sie haben folgende Möglichkeiten, die Laufzeit hinzuzufügen:
Initialisierungs-Script Fügen Sie die Laufzeitabhängigkeit einem benutzerdefinierten Initialisierungsskript hinzu, das beim Erstellen des Containers ausgeführt wird.
Nachdem Sie dem Script die Laufzeitabhängigkeit hinzugefügt haben, folgen Sie der Anleitung zum Erstellen, Neuerstellen und Aktualisieren eines Clusters.
Manuelle Installation: Fügen Sie die JAR-Datei des Iceberg-Katalog-Plug-ins manuell hinzu und konfigurieren Sie die Spark-Eigenschaften so, dass die Laufzeit in Ihrem Cluster enthalten ist.
Spark-Job senden
Sie haben folgende Möglichkeiten, einen Spark-Job einzureichen:
gcloud-CLI
gcloud dataproc jobs submit spark-sql \ --project=PROJECT_ID \ --cluster=CLUSTER_NAME \ --region==REGION \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \ spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \ spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \ spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \ spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \ --execute="SPARK_SQL_COMMAND"
Ersetzen Sie Folgendes:
PROJECT_ID
: die ID des Google Cloud Projekts, das den Dataproc-Cluster enthält.CLUSTER_NAME
: der Name des Dataproc-Clusters, mit dem Sie den Spark SQL-Job ausführen.REGION
: die Compute Engine-Region, in der sich Ihr Cluster befindet.LOCATION
: der Speicherort der BigQuery-Ressourcen.CATALOG_NAME
: der Name des Spark-Katalogs, den Sie für Ihren SQL-Job verwenden.WAREHOUSE_DIRECTORY
: den Cloud Storage-Ordner, der Ihr Data Warehouse enthält. Dieser Wert beginnt mitgs://
.SPARK_SQL_COMMAND
: Die Spark SQL-Abfrage, die Sie ausführen möchten. Diese Abfrage enthält die Befehle zum Erstellen Ihrer Ressourcen. Beispiel: Sie möchten einen Namespace und eine Tabelle erstellen.
Interaktive Funken
Verbindung zu Spark herstellen und das Katalog-Plug-in installieren
Um das Katalog-Plug-in für den BigQuery-Metastore zu installieren, stellen Sie über SSH eine Verbindung zu Ihrem Dataproc-Cluster her.
- Google Cloud Rufen Sie in der Console die Seite VM-Instanzen auf.
Wenn Sie eine Verbindung zu einer Dataproc-VM-Instanz herstellen möchten, klicken Sie in der Liste der VM-Instanzen auf SSH. Die Ausgabe sieht etwa so aus:
Connected, host fingerprint: ssh-rsa ... Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ... ... example-cluster@cluster-1-m:~$
Führen Sie im Terminal den folgenden Befehl zur Initialisierung des BigQuery-Metaspeichers aus:
spark-sql \ --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \ --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \ --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \ --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \ --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY
Ersetzen Sie Folgendes:
CATALOG_NAME
: der Name des Spark-Katalogs, den Sie für Ihren SQL-Job verwenden.PROJECT_ID
: Die Google Cloud Projekt-ID des BigQuery-Metastore-Katalogs, mit dem Ihr Spark-Katalog verknüpft ist.LOCATION
: Der Google Cloud Speicherort des BigQuery-Metaspeichers.WAREHOUSE_DIRECTORY
: den Cloud Storage-Ordner, der Ihr Data Warehouse enthält. Dieser Wert beginnt mitgs://
.
Nachdem Sie eine Verbindung zu einem Cluster hergestellt haben, wird im Spark-Terminal der Prompt
spark-sql
angezeigt.spark-sql (default)>
BigQuery-Metastore-Ressourcen verwalten
Sie sind jetzt mit dem BigQuery-Metastore verbunden. Sie können sich Ihre vorhandenen Ressourcen ansehen oder neue Ressourcen basierend auf den im BigQuery-Metastore gespeicherten Metadaten erstellen.
Führen Sie beispielsweise die folgenden Befehle in der interaktiven Spark SQL-Sitzung aus, um einen Iceberg-Namespace und eine Iceberg-Tabelle zu erstellen.
Benutzerdefinierten Iceberg-Katalog verwenden:
USE `CATALOG_NAME`;
Erstellen Sie einen Namespace:
CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
Verwenden Sie den erstellten Namespace:
USE NAMESPACE_NAME;
So erstellen Sie eine Iceberg-Tabelle:
CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
So fügen Sie eine Tabellenzeile ein:
INSERT INTO TABLE_NAME VALUES (1, "first row");
So fügen Sie eine Tabellenspalte hinzu:
ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
So rufen Sie Tabellenmetadaten auf:
DESCRIBE EXTENDED TABLE_NAME;
Listen Sie die Tabellen im Namespace auf:
SHOW TABLES;
Zeppelin-Notebook
Rufen Sie in der Google Cloud Console die Seite Dataproc-Cluster auf.
Klicken Sie auf den Namen des Clusters, den Sie verwenden möchten.
Die Seite Clusterdetails wird geöffnet.
Klicken Sie im Navigationsmenü auf Weboberflächen.
Klicken Sie unter Komponentengateway auf Zeppelin. Die Seite Zeppelin-Notebook wird geöffnet.
Klicken Sie im Navigationsmenü auf Notizbuch und dann auf + Neue Notiz erstellen.
Geben Sie im Dialogfeld einen Namen für das Notizbuch ein. Lassen Sie Spark als Standard-Interpreter ausgewählt.
Klicken Sie auf Erstellen. Ein neues Notebook wird erstellt.
Klicken Sie im Notebook auf das Menü „Einstellungen“ und dann auf Interpreter.
Suchen Sie im Feld Nach Auswertern suchen nach Spark.
Klicken Sie auf Bearbeiten.
Geben Sie im Feld Spark.jars den URI des Spark-JAR-Objekts ein.
https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
Klicken Sie auf Speichern.
Klicken Sie auf OK.
Kopieren Sie den folgenden PySpark-Code in Ihr Zeppelin-Notebook.
%pyspark from pyspark.sql import SparkSession spark = SparkSession.builder \ .appName("Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.sql("select version()").show() spark.sql("USE `CATALOG_NAME`;") spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;") spark.sql("USE NAMESPACE_NAME;") spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;") spark.sql("DESCRIBE TABLE_NAME;").show()
Ersetzen Sie Folgendes:
CATALOG_NAME
: Der Name des Spark-Katalogs, der für den SQL-Job verwendet werden soll.PROJECT_ID
: die ID des Google Cloud Projekts, das den Dataproc-Cluster enthält.WAREHOUSE_DIRECTORY
: den Cloud Storage-Ordner, der Ihr Data Warehouse enthält. Dieser Wert beginnt mitgs://
.NAMESPACE_NAME
: Der Name des Namespace, der auf Ihre Spark-Tabelle verweist.WAREHOUSE_DIRECTORY
: Der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.TABLE_NAME
: ein Tabellenname für Ihre Spark-Tabelle.
Klicken Sie auf das Symbol „Ausführen“ oder drücken Sie die Taste
Shift-Enter
, um den Code auszuführen. Wenn der Job abgeschlossen ist, wird in der Statusmeldung „Spark-Job abgeschlossen“ angezeigt und in der Ausgabe der Tabelleninhalt:
BigQuery-Metastore mit Flink verbinden
In der folgenden Anleitung wird beschrieben, wie Sie mit dem Flink SQL-Client eine Verbindung zwischen Dataproc und dem BigQuery-Metastore herstellen.
Katalog-Plug-in installieren und eine Verbindung zu einer Flink-Sitzung herstellen
So stellen Sie eine Verbindung zwischen dem BigQuery-Metastore und Flink her:
- Erstellen Sie einen Dataproc-Cluster mit aktivierter optionaler Flink-Komponente und verwenden Sie Dataproc 2.2 oder höher.
Rufen Sie in der Google Cloud -Console die Seite VM-Instanzen auf:
Klicken Sie in der Liste der VM-Instanzen auf SSH, um eine Verbindung zu einer Dataproc-VM-Instanz herzustellen.
Konfigurieren Sie das Iceberg-Plug-in für benutzerdefinierte Kataloge für den BigQuery-Metastore:
FLINK_VERSION=1.17 ICEBERG_VERSION=1.5.2 cd /usr/lib/flink sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
Starten Sie die Flink-Sitzung auf YARN:
HADOOP_CLASSPATH=`hadoop classpath` sudo bin/yarn-session.sh -nm flink-dataproc -d sudo bin/sql-client.sh embedded \ -s yarn-session
So erstellst du einen Katalog in Flink:
CREATE CATALOG CATALOG_NAME WITH ( 'type'='iceberg', 'warehouse'='WAREHOUSE_DIRECTORY', 'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog', 'gcp_project'='PROJECT_ID', 'gcp_location'='LOCATION' );
Ersetzen Sie Folgendes:
CATALOG_NAME
: die Flink-Katalog-ID, die mit einem BigQuery-Metastore-Katalog verknüpft ist.WAREHOUSE_DIRECTORY
: Der Basispfad für das Warehouse-Verzeichnis (der Cloud Storage-Ordner, in dem Flink Dateien erstellt). Dieser Wert beginnt mitgs://
.PROJECT_ID
: Die Projekt-ID des BigQuery-Metastore-Katalogs, mit dem der Flink-Katalog verknüpft ist.LOCATION
: den Speicherort der BigQuery-Ressourcen.
Ihre Flink-Sitzung ist jetzt mit dem BigQuery-Metastore verbunden und Sie können Flink-SQL-Befehle ausführen.
BigQuery-Metastore-Ressourcen verwalten
Nachdem Sie eine Verbindung zum BigQuery-Metastore hergestellt haben, können Sie Ressourcen basierend auf den im BigQuery-Metastore gespeicherten Metadaten erstellen und aufrufen.
Führen Sie beispielsweise die folgenden Befehle in Ihrer interaktiven Flink SQL-Sitzung aus, um eine Iceberg-Datenbank und ‑Tabelle zu erstellen.
Benutzerdefinierten Iceberg-Katalog verwenden:
USE CATALOG CATALOG_NAME;
Ersetzen Sie
CATALOG_NAME
durch Ihre Flink-Katalog-ID.So erstellen Sie eine Datenbank, die ein Dataset in BigQuery erstellt:
CREATE DATABASE IF NOT EXISTS DATABASE_NAME;
Ersetzen Sie
DATABASE_NAME
durch den Namen Ihrer neuen Datenbank.Verwenden Sie die von Ihnen erstellte Datenbank:
USE DATABASE_NAME;
Erstellen Sie eine Iceberg-Tabelle. Im Folgenden wird eine Beispieltabelle für Verkäufe erstellt:
CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) );
Ersetzen Sie
ICEBERG_TABLE_NAME
durch einen Namen für die neue Tabelle.So rufen Sie Tabellenmetadaten auf:
DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
Tabellen in der Datenbank auflisten:
SHOW TABLES;
Daten in die Tabelle aufnehmen
Nachdem Sie im vorherigen Abschnitt eine Iceberg-Tabelle erstellt haben, können Sie Flink DataGen als Datenquelle verwenden, um Echtzeitdaten in Ihre Tabelle aufzunehmen. Die folgenden Schritte sind ein Beispiel für diesen Workflow:
So erstellen Sie eine temporäre Tabelle mit DataGen:
CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME WITH ( 'connector' = 'datagen', 'rows-per-second' = '10', 'fields.order_number.kind' = 'sequence', 'fields.order_number.start' = '1', 'fields.order_number.end' = '1000000', 'fields.price.min' = '0', 'fields.price.max' = '10000', 'fields.buyer.first_name.length' = '10', 'fields.buyer.last_name.length' = '10' ) LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);
Ersetzen Sie Folgendes:
DATABASE_NAME
: der Name der Datenbank, in der die temporäre Tabelle gespeichert werden soll.TEMP_TABLE_NAME
: Name der temporären Tabelle.ICEBERG_TABLE_NAME
: der Name der Iceberg-Tabelle, die Sie im vorherigen Abschnitt erstellt haben.
Legen Sie die Parallelität auf „1“ fest:
SET 'parallelism.default' = '1';
Legen Sie das Intervall für den Prüfpunkt fest:
SET 'execution.checkpointing.interval' = '10second';
Prüfpunkt festlegen:
SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
Starten Sie den Echtzeit-Streaming-Job:
INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;
Die Ausgabe sieht etwa so aus:
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 0de23327237ad8a811d37748acd9c10b
So prüfen Sie den Status des Streamingjobs:
Rufen Sie in der Google Cloud Console die Seite Cluster auf.
Wählen Sie Ihren Cluster aus.
Klicken Sie auf den Tab Weboberflächen.
Klicken Sie auf den Link YARN ResourceManager.
Suchen Sie auf der Benutzeroberfläche von YARN ResourceManager nach Ihrer Flink-Sitzung und klicken Sie unter Tracking-UI auf den Link ApplicationMaster.
Prüfen Sie in der Spalte Status, ob der Jobstatus Wird ausgeführt lautet.
Streamingdaten im Flink SQL-Client abfragen:
SELECT * FROM ICEBERG_TABLE_NAME /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/ ORDER BY order_time desc LIMIT 20;
Streamingdaten in BigQuery abfragen:
SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME` ORDER BY order_time desc LIMIT 20;
Beenden Sie den Streaming-Job im Flink SQL-Client:
STOP JOB 'JOB_ID';
Ersetzen Sie
JOB_ID
durch die Job-ID, die in der Ausgabe angezeigt wurde, als Sie den Streamingjob erstellt haben.
Nächste Schritte
- Richten Sie optionale BigQuery-Metastore-Funktionen ein.
- Tabellen aus Spark in BigQuery ansehen und abfragen