BigLake Metastore mit Dataproc verwenden

In diesem Dokument wird beschrieben, wie Sie den BigLake-Metastore mit Dataproc in Compute Engine verwenden. Mit dieser Verbindung erhalten Sie einen einzelnen, gemeinsam genutzten Metastore, der für Open-Source-Software-Engines wie Apache Spark oder Apache Flink funktioniert.

Hinweise

  1. Aktivieren Sie die Abrechnung für Ihr Google Cloud -Projekt. So prüfen Sie, ob die Abrechnung für ein Projekt aktiviert ist.
  2. Aktivieren Sie die BigQuery- und Dataproc-APIs.

    APIs aktivieren

  3. Optional: Funktionsweise von BigLake Metastore und Gründe für die Verwendung.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen zuzuweisen, um die Berechtigungen zu erhalten, die Sie zur Verwendung von Spark oder Flink und Dataproc mit BigLake Metastore als Metadatenspeicher benötigen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

Allgemeiner Workflow

So verwenden Sie Dataproc in Compute Engine mit einem BigLake-Metastore:

  1. Erstellen Sie einen Dataproc-Cluster oder konfigurieren Sie einen vorhandenen Cluster.
  2. Verbinden Sie sich mit Ihrer bevorzugten Open-Source-Software-Engine, z. B. Spark oder Flink.
  3. Verwenden Sie eine JAR-Datei, um das Apache Iceberg-Katalog-Plug-in auf dem Cluster zu installieren.
  4. Erstellen und verwalten Sie Ihre BigLake Metastore-Ressourcen nach Bedarf, je nach der Open-Source-Software-Engine, die Sie verwenden.
  5. In BigQuery auf Ihre BigLake Metastore-Ressourcen zugreifen und diese verwenden.

BigLake Metastore mit Spark verbinden

In der folgenden Anleitung erfahren Sie, wie Sie Dataproc mithilfe von interaktivem Spark SQL mit dem BigLake-Metastore verbinden.

Iceberg-Katalog-Plug-in herunterladen

Wenn Sie BigLake Metastore mit Dataproc und Spark verbinden möchten, müssen Sie die JAR-Datei des BigLake Metastore Iceberg-Katalog-Plug-ins verwenden.

Diese Datei ist standardmäßig in Dataproc-Image-Version 2.2 enthalten. Wenn Ihre Dataproc-Cluster keinen direkten Zugriff auf das Internet haben, müssen Sie das Plug-in herunterladen und in einen Cloud Storage-Bucket hochladen, auf den Ihr Dataproc-Cluster zugreifen kann.

BigLake Metastore-Iceberg-Katalog-Plug-in herunterladen

Dataproc-Cluster konfigurieren

Bevor Sie eine Verbindung zum BigLake-Metastore herstellen, müssen Sie einen Dataproc-Cluster einrichten.

Dazu können Sie einen neuen Cluster erstellen oder einen vorhandenen verwenden. Anschließend verwenden Sie diesen Cluster, um interaktive Spark SQL-Abfragen auszuführen und Ihre BigLake Metastore-Ressourcen zu verwalten.

  • Für das Subnetz in der Region, in der der Cluster erstellt wird, muss der private Google-Zugriff aktiviert sein. Standardmäßig haben Dataproc-Cluster-VMs, die mit einer Image-Version 2.2 (Standard) oder höher erstellt wurden, nur interne IP-Adressen. Damit Cluster-VMs mit Google APIs kommunizieren können, müssen Sie den privaten Google-Zugriff für das Netzwerk-Subnetz default (oder den vom Nutzer angegebenen Netzwerknamen, falls zutreffend) in der Region aktivieren, in der der Cluster erstellt wird.

  • Wenn Sie das Beispiel für die Zeppelin-Weboberfläche in dieser Anleitung ausführen möchten, müssen Sie einen Dataproc-Cluster mit aktivierter optionaler Zeppelin-Komponente verwenden oder erstellen.

Neuer Cluster

Führen Sie den folgenden gcloud dataproc clusters create-Befehl aus, um einen neuen Dataproc-Cluster zu erstellen. Diese Konfiguration enthält die Einstellungen, die Sie für die Verwendung von BigLake Metastore benötigen.

gcloud dataproc clusters create CLUSTER_NAME \
  --project=PROJECT_ID \
  --region=LOCATION \
  --optional-components=ZEPPELIN \
  --enable-component-gateway \
  --single-node

Ersetzen Sie Folgendes:

  • CLUSTER_NAME: ein Name für Ihren Dataproc-Cluster.
  • PROJECT_ID: die ID des Google Cloud Projekts, in dem Sie den Cluster erstellen.
  • LOCATION: die Google Cloud Region, in der Sie den Cluster erstellen.

Vorhandener Cluster

Wenn Sie einen vorhandenen Cluster konfigurieren möchten, fügen Sie dem Cluster die folgende Iceberg Spark-Laufzeit hinzu.

org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1

Sie haben folgende Möglichkeiten, die Laufzeit hinzuzufügen:

Spark-Job senden

Verwenden Sie eine der folgenden Methoden, um einen Spark-Job zu senden:

gcloud-CLI

gcloud dataproc jobs submit spark-sql \
--project=PROJECT_ID \
--cluster=CLUSTER_NAME \
--region==REGION \
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
--properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
--execute="SPARK_SQL_COMMAND"

Ersetzen Sie Folgendes:

  • PROJECT_ID: die ID des Google Cloud Projekts, das den Dataproc-Cluster enthält.
  • CLUSTER_NAME: der Name des Dataproc-Clusters, den Sie zum Ausführen des Spark SQL-Jobs verwenden.
  • REGION: die Compute Engine-Region, in der sich Ihr Cluster befindet.
  • LOCATION: der Standort der BigQuery-Ressourcen.
  • CATALOG_NAME: der Name des Spark-Katalogs, den Sie mit Ihrem SQL-Job verwenden.
  • WAREHOUSE_DIRECTORY: Der Cloud Storage-Ordner, der Ihr Data Warehouse enthält. Dieser Wert beginnt mit gs://.
  • SPARK_SQL_COMMAND: Die Spark SQL-Abfrage, die Sie ausführen möchten. Diese Abfrage enthält die Befehle zum Erstellen Ihrer Ressourcen. So erstellen Sie beispielsweise einen Namespace und eine Tabelle.

Interaktiver Spark

Verbindung zu Spark herstellen und das Katalog-Plug-in installieren

Wenn Sie das Katalog-Plug-in für BigLake Metastore installieren möchten, stellen Sie über SSH eine Verbindung zu Ihrem Dataproc-Cluster her.

  1. Rufen Sie in der Google Cloud Console die Seite VM-Instanzen auf.
  2. Wenn Sie eine Verbindung zu einer Dataproc-VM-Instanz herstellen möchten, klicken Sie in der Liste der VM-Instanzen auf SSH. Die Ausgabe sieht etwa so aus:

    Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$
    
  3. Führen Sie im Terminal den folgenden Befehl zur Initialisierung des BigLake-Metastores aus:

    spark-sql \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Ersetzen Sie Folgendes:

    • CATALOG_NAME: der Name des Spark-Katalogs, den Sie mit Ihrem SQL-Job verwenden.
    • PROJECT_ID: Die Google Cloud Projekt-ID des BigLake Metastore-Katalogs, mit dem Ihr Spark-Katalog verknüpft ist.
    • LOCATION: Der Google Cloud Standort des BigLake-Metastores.
    • WAREHOUSE_DIRECTORY: Der Cloud Storage-Ordner, der Ihr Data Warehouse enthält. Dieser Wert beginnt mit gs://.

    Nachdem Sie erfolgreich eine Verbindung zu einem Cluster hergestellt haben, wird im Spark-Terminal der Prompt spark-sql angezeigt.

    spark-sql (default)>
    

BigLake Metastore-Ressourcen verwalten

Sie sind jetzt mit BigLake Metastore verbunden. Sie können Ihre vorhandenen Ressourcen ansehen oder neue Ressourcen auf Grundlage Ihrer in BigLake Metastore gespeicherten Metadaten erstellen.

Führen Sie beispielsweise die folgenden Befehle in der interaktiven Spark SQL-Sitzung aus, um einen Iceberg-Namespace und eine Iceberg-Tabelle zu erstellen.

  • Benutzerdefinierten Iceberg-Katalog verwenden:

    USE `CATALOG_NAME`;
  • Erstellen Sie einen Namespace:

    CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
  • So verwenden Sie den erstellten Namespace:

    USE NAMESPACE_NAME;
  • Iceberg-Tabelle erstellen:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
  • So fügen Sie eine Tabellenzeile ein:

    INSERT INTO TABLE_NAME VALUES (1, "first row");
  • So fügen Sie eine Tabellenspalte hinzu:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
  • Tabellenmetadaten ansehen:

    DESCRIBE EXTENDED TABLE_NAME;
  • Tabellen im Namespace auflisten:

    SHOW TABLES;

Zeppelin-Notebook

  1. Rufen Sie in der Google Cloud -Konsole die Seite Dataproc-Cluster auf.

    Zu Dataproc-Clustern

  2. Klicken Sie auf den Namen des Clusters, den Sie verwenden möchten.

    Die Seite Clusterdetails wird geöffnet.

  3. Klicken Sie im Navigationsmenü auf Web-Schnittstellen.

  4. Klicken Sie unter Komponentengateway auf Zeppelin. Die Seite für Zeppelin-Notebooks wird geöffnet.

  5. Klicken Sie im Navigationsmenü auf Notebook und dann auf + Neue Notiz erstellen.

  6. Geben Sie im Dialogfeld einen Notebooknamen ein. Lassen Sie Spark als Standardinterpreter ausgewählt.

  7. Klicken Sie auf Erstellen. Ein neues Notebook wird erstellt.

  8. Klicken Sie im Notebook auf das Menü „Einstellungen“ und dann auf Interpreter.

  9. Suchen Sie im Feld Interpreters durchsuchen nach Spark.

  10. Klicken Sie auf Bearbeiten.

  11. Geben Sie im Feld Spark.jars den URI des Spark-JAR ein.

    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
    
  12. Klicken Sie auf Speichern.

  13. Klicken Sie auf OK.

  14. Kopieren Sie den folgenden PySpark-Code in Ihr Zeppelin-Notebook.

    %pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("BigLake Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.sql("select version()").show()
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME;").show()

    Ersetzen Sie Folgendes:

    • CATALOG_NAME: Der Name des Spark-Katalogs, der für den SQL-Job verwendet werden soll.
    • PROJECT_ID: die ID des Google Cloud Projekts, das den Dataproc-Cluster enthält.
    • WAREHOUSE_DIRECTORY: Der Cloud Storage-Ordner, der Ihr Data Warehouse enthält. Dieser Wert beginnt mit gs://.
    • NAMESPACE_NAME: Der Namespace-Name, der auf Ihre Spark-Tabelle verweist.
    • WAREHOUSE_DIRECTORY: Der URI des Cloud Storage-Ordners, in dem Ihr Data Warehouse gespeichert ist.
    • TABLE_NAME: ein Tabellenname für Ihre Spark-Tabelle.
  15. Klicken Sie auf das Ausführungssymbol oder drücken Sie Shift-Enter, um den Code auszuführen. Wenn der Job abgeschlossen ist, wird die Statusmeldung „Spark Job Finished“ angezeigt und in der Ausgabe wird der Tabelleninhalt angezeigt:

In der folgenden Anleitung erfahren Sie, wie Sie Dataproc mit dem Flink SQL-Client mit BigLake Metastore verbinden.

So verbinden Sie BigLake Metastore mit Flink:

  1. Erstellen Sie einen Dataproc-Cluster mit aktivierter optionaler Flink-Komponente und achten Sie darauf, dass Sie Dataproc 2.2 oder höher verwenden.
  2. Rufen Sie in der Google Cloud Console die Seite VM-Instanzen auf:

    Zu "VM-Instanzen"

  3. Klicken Sie in der Liste der VM-Instanzen auf SSH, um eine Verbindung zu einer Dataproc-VM-Instanz herzustellen.

  4. Konfigurieren Sie das benutzerdefinierte Iceberg-Katalog-Plug-in für BigLake Metastore:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. Starten Sie die Flink-Sitzung in YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
      -s yarn-session
  6. Katalog in Flink erstellen:

    CREATE CATALOG CATALOG_NAME WITH (
      'type'='iceberg',
      'warehouse'='WAREHOUSE_DIRECTORY',
      'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
      'gcp_project'='PROJECT_ID',
      'gcp_location'='LOCATION'
    );

    Ersetzen Sie Folgendes:

    • CATALOG_NAME: Die Flink-Katalogkennung, die mit einem BigLake Metastore-Katalog verknüpft ist.
    • WAREHOUSE_DIRECTORY: Der Basispfad für das Warehouse-Verzeichnis (der Cloud Storage-Ordner, in dem Flink Dateien erstellt). Dieser Wert beginnt mit gs://.
    • PROJECT_ID: Die Projekt-ID des BigLake Metastore-Katalogs, mit dem der Flink-Katalog verknüpft ist.
    • LOCATION: Der Standort der BigQuery-Ressourcen.

Ihre Flink-Sitzung ist jetzt mit BigLake Metastore verbunden und Sie können Flink SQL-Befehle ausführen.

Nachdem Sie eine Verbindung zu BigLake Metastore hergestellt haben, können Sie Ressourcen basierend auf den in BigLake Metastore gespeicherten Metadaten erstellen und ansehen.

Führen Sie beispielsweise die folgenden Befehle in Ihrer interaktiven Flink SQL-Sitzung aus, um eine Iceberg-Datenbank und -Tabelle zu erstellen.

  1. Benutzerdefinierten Iceberg-Katalog verwenden:

    USE CATALOG CATALOG_NAME;

    Ersetzen Sie CATALOG_NAME durch Ihre Flink-Katalog-ID.

  2. Erstellen Sie eine Datenbank. Dadurch wird ein Dataset in BigQuery erstellt:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Ersetzen Sie DATABASE_NAME durch den Namen Ihrer neuen Datenbank.

  3. Verwenden Sie die Datenbank, die Sie erstellt haben:

    USE DATABASE_NAME;
  4. Iceberg-Tabelle erstellen Im Folgenden wird eine Beispielverkaufstabelle erstellt:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
        order_number BIGINT,
        price        DECIMAL(32,2),
        buyer        ROW<first_name STRING, last_name STRING>,
        order_time   TIMESTAMP(3)
    );

    Ersetzen Sie ICEBERG_TABLE_NAME durch einen Namen für die neue Tabelle.

  5. Tabellenmetadaten ansehen:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Tabellen in der Datenbank auflisten:

    SHOW TABLES;

Daten in die Tabelle aufnehmen

Nachdem Sie im vorherigen Abschnitt eine Iceberg-Tabelle erstellt haben, können Sie Flink DataGen als Datenquelle verwenden, um Echtzeitdaten in Ihre Tabelle aufzunehmen. Die folgenden Schritte sind ein Beispiel für diesen Workflow:

  1. So erstellen Sie eine temporäre Tabelle mit DataGen:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.order_number.kind' = 'sequence',
        'fields.order_number.start' = '1',
        'fields.order_number.end' = '1000000',
        'fields.price.min' = '0',
        'fields.price.max' = '10000',
        'fields.buyer.first_name.length' = '10',
        'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Ersetzen Sie Folgendes:

    • DATABASE_NAME: der Name der Datenbank, in der die temporäre Tabelle gespeichert werden soll.
    • TEMP_TABLE_NAME: Ein Name für die temporäre Tabelle.
    • ICEBERG_TABLE_NAME: Der Name der Iceberg-Tabelle, die Sie im vorherigen Abschnitt erstellt haben.
  2. Legen Sie die Parallelität auf 1 fest:

    SET 'parallelism.default' = '1';
  3. Legen Sie das Prüfpunktintervall fest:

    SET 'execution.checkpointing.interval' = '10second';
  4. Prüfpunkt festlegen:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Starten Sie den Echtzeit-Streamingjob:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    Die Ausgabe sieht etwa so aus:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. So prüfen Sie den Status des Streamingjobs:

    1. Rufen Sie in der Google Cloud Console die Seite Cluster auf.

      Zu den Clustern

    2. Wählen Sie Ihren Cluster aus.

    3. Klicken Sie auf den Tab Weboberflächen.

    4. Klicken Sie auf den Link YARN ResourceManager.

    5. Suchen Sie in der YARN ResourceManager-Oberfläche nach Ihrer Flink-Sitzung und klicken Sie unter Tracking UI auf den Link ApplicationMaster.

    6. Prüfen Sie in der Spalte Status, ob der Jobstatus Wird ausgeführt lautet.

  7. Streamingdaten im Flink SQL-Client abfragen:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Streamingdaten in BigQuery abfragen:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Beenden Sie den Streamingjob im Flink SQL-Client:

    STOP JOB 'JOB_ID';

    Ersetzen Sie JOB_ID durch die Job-ID, die in der Ausgabe angezeigt wurde, als Sie den Streamingjob erstellt haben.

Nächste Schritte