Usa el metastore de BigQuery con Dataproc

En este documento, se explica cómo usar el metastore de BigQuery con Dataproc en Compute Engine. Esta conexión te proporciona un metastore único y compartido que funciona en motores de software de código abierto, como Apache Spark o Apache Flink.

Antes de comenzar

  1. Habilita la facturación para tu proyecto de Google Cloud. Obtén información sobre cómo verificar si la facturación está habilitada en un proyecto.
  2. Habilita las APIs de BigQuery y Dataproc.

    Habilitar las API

  3. Opcional: Comprende cómo funciona el metastore de BigQuery y por qué deberías usarlo.

Roles obligatorios

Para obtener los permisos que necesitas para usar Spark o Flink y Dataproc con el metastore de BigQuery como un almacén de metadatos, pídele a tu administrador que te otorgue los siguientes roles de IAM:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Flujo de trabajo general

Para usar Dataproc en Compute Engine con el metastore de BigQuery, sigue estos pasos generales:

  1. Crea un clúster de Dataproc o configura uno existente.
  2. Conéctate al motor de software de código abierto que prefieras, como Spark o Flink.
  3. Usa un archivo JAR para instalar el complemento del catálogo de Apache Iceberg en el clúster.
  4. Crea y administra tus recursos de metastore de BigQuery según sea necesario, según el motor de software de código abierto que uses.
  5. En BigQuery, accede a tus recursos de metastore de BigQuery y úsalo.

Conecta BigQuery Metastore a Spark

En las siguientes instrucciones, se muestra cómo conectar Dataproc al metastore de BigQuery con Spark SQL interactivo.

Descarga el complemento del catálogo de Iceberg

Para conectar el metastore de BigQuery con Dataproc y Spark, debes usar el archivo jar del complemento del catálogo de Iceberg del metastore de BigQuery.

Este archivo se incluye de forma predeterminada en la versión 2.2 de la imagen de Dataproc. Si tus clústeres de Dataproc no tienen acceso directo a Internet, debes descargar el complemento y subirlo a un bucket de Cloud Storage al que pueda acceder tu clúster de Dataproc.

Descarga el complemento de catálogo de Apache Iceberg de BigQuery Metastore.

Configura un clúster de Dataproc

Antes de conectarte al metastore de BigQuery, debes configurar un clúster de Dataproc.

Para ello, puedes crear un clúster nuevo o usar uno existente. Luego, usarás este clúster para ejecutar Spark SQL interactivo y administrar tus recursos de metastore de BigQuery.

  • La subred de la región en la que se crea el clúster debe tener habilitado el Acceso privado a Google (PGA). De forma predeterminada, las VMs del clúster de Dataproc, creadas con una versión de imagen 2.2 (predeterminada) o posterior, solo tienen direcciones IP internas. Para permitir que las VMs del clúster se comuniquen con las APIs de Google, habilita el Acceso privado a Google en la subred de red default (o el nombre de red especificado por el usuario, si corresponde) en la región donde se crea el clúster.

  • Si deseas ejecutar el ejemplo de la interfaz web de Zeppelin en esta guía, debes usar o crear un clúster de Dataproc con el componente opcional de Zeppelin habilitado.

Clúster nuevo

Para crear un clúster de Dataproc nuevo, ejecuta el siguiente comando gcloud dataproc clusters create. Esta configuración contiene la configuración que necesitas para usar el metastore de BigQuery.

gcloud dataproc clusters create CLUSTER_NAME \
  --project=PROJECT_ID \
  --region=LOCATION \
  --optional-components=ZEPPELIN \
  --enable-component-gateway \
  --single-node

Reemplaza lo siguiente:

  • CLUSTER_NAME: Es un nombre para tu clúster de Dataproc.
  • PROJECT_ID: Es el ID del proyecto de Google Cloud en el que creas el clúster.
  • LOCATION: Es la Google Cloud región en la que crearás el clúster.

Clúster existente

Para configurar un clúster existente, agrega el siguiente tiempo de ejecución de Iceberg Spark a tu clúster.

org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1

Puedes agregar el entorno de ejecución con una de las siguientes opciones:

Enviar un trabajo de Spark

Para enviar un trabajo de Spark, usa uno de los siguientes métodos:

gcloud CLI

gcloud dataproc jobs submit spark-sql \
--project=PROJECT_ID \
--cluster=CLUSTER_NAME \
--region==REGION \
--jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
--properties=spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog, \
spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog, \
spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID, \
spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION, \
spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY \
--execute="SPARK_SQL_COMMAND"

Reemplaza lo siguiente:

  • PROJECT_ID: Es el ID del proyecto de Google Cloud que contiene el clúster de Dataproc.
  • CLUSTER_NAME: Es el nombre del clúster de Dataproc que usas para ejecutar el trabajo de Spark SQL.
  • REGION: La región de Compute Engine en la que se encuentra tu clúster.
  • LOCATION: la ubicación de los recursos de BigQuery.
  • CATALOG_NAME: Es el nombre del catálogo de Spark que usas con tu trabajo de SQL.
  • WAREHOUSE_DIRECTORY: Es la carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza con gs://.
  • SPARK_SQL_COMMAND: Es la consulta de SQL de Spark que deseas ejecutar. Esta consulta incluye los comandos para crear tus recursos. Por ejemplo, para crear un espacio de nombres y una tabla.

Interactive Spark

Conéctate a Spark y, luego, instala el complemento de catálogo

Para instalar el complemento de catálogo para el almacén de metadatos de BigQuery, conéctate a tu clúster de Dataproc con SSH.

  1. En la consola de Google Cloud, ve a la página Instancias de VM.
  2. Para conectarte a una instancia de VM de Dataproc, haz clic en SSH en la lista de instancias de máquinas virtuales. El resultado es similar al siguiente:

    Connected, host fingerprint: ssh-rsa ...
    Linux cluster-1-m 3.16.0-0.bpo.4-amd64 ...
    ...
    example-cluster@cluster-1-m:~$
    
  3. En la terminal, ejecuta el siguiente comando de inicialización del metastore de BigQuery:

    spark-sql \
    --jars https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar \
    --conf spark.sql.catalog.CATALOG_NAME=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.catalog-impl=org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_project=PROJECT_ID \
    --conf spark.sql.catalog.CATALOG_NAME.gcp_location=LOCATION \
    --conf spark.sql.catalog.CATALOG_NAME.warehouse=WAREHOUSE_DIRECTORY

    Reemplaza lo siguiente:

    • CATALOG_NAME: Es el nombre del catálogo de Spark que usas con tu trabajo de SQL.
    • PROJECT_ID: Es el ID del proyecto de Google Cloud del catálogo de metastore de BigQuery con el que se vincula tu catálogo de Spark.
    • LOCATION: La Google Cloud ubicación del metastore de BigQuery.
    • WAREHOUSE_DIRECTORY: Es la carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza con gs://.

    Después de conectarte correctamente a un clúster, la terminal de Spark mostrará el mensaje spark-sql.

    spark-sql (default)>
    

Administra recursos de metastore de BigQuery

Ahora estás conectado al metastore de BigQuery. Puedes ver tus recursos existentes o crear recursos nuevos según los metadatos almacenados en el metastore de BigQuery.

Por ejemplo, intenta ejecutar los siguientes comandos en la sesión interactiva de Spark SQL para crear un espacio de nombres y una tabla de Iceberg.

  • Usa el catálogo de Iceberg personalizado:

    USE `CATALOG_NAME`;
  • Crea un espacio de nombres:

    CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;
  • Usa el espacio de nombres creado:

    USE NAMESPACE_NAME;
  • Crea una tabla de Iceberg:

    CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;
  • Inserta una fila de la tabla:

    INSERT INTO TABLE_NAME VALUES (1, "first row");
  • Agrega una columna de tabla:

    ALTER TABLE TABLE_NAME ADD COLUMNS (newDoubleCol double);
  • Visualiza los metadatos de la tabla:

    DESCRIBE EXTENDED TABLE_NAME;
  • Obtén una lista de las tablas del espacio de nombres:

    SHOW TABLES;

Notebook de Zeppelin

  1. En la consola de Google Cloud, ve a la página Clústeres de Dataproc.

    Ir a Clústeres de Dataproc

  2. Haz clic en el nombre del clúster que deseas usar.

    Se abrirá la página Detalles del clúster.

  3. En el menú de navegación, haz clic en Interfaces web.

  4. En Puerta de enlace del componente, haz clic en Zeppelin. Se abrirá la página del notebook de Zeppelin.

  5. En el menú de navegación, haz clic en Cuaderno y, luego, en +Crear nota nueva.

  6. En el cuadro de diálogo, ingresa un nombre para el notebook. Deja seleccionado Spark como el intérprete predeterminado.

  7. Haz clic en Crear. Se crea un notebook nuevo.

  8. En el notebook, haz clic en el menú de configuración y, luego, en Intérprete.

  9. En el campo Buscar intérpretes, busca Spark.

  10. Haz clic en Editar.

  11. En el campo Spark.jars, ingresa el URI del jar de Spark.

    https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.6.1/iceberg-spark-runtime-3.5_2.12-1.6.1.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.6.1-1.0.1-beta.jar
    
  12. Haz clic en Guardar.

  13. Haz clic en Aceptar.

  14. Copia el siguiente código de PySpark en tu notebook de Zeppelin.

    %pyspark
    from pyspark.sql import SparkSession
    spark = SparkSession.builder \
    .appName("BigQuery Metastore Iceberg") \
    .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \
    .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \
    .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \
    .getOrCreate()
    spark.sql("select version()").show()
    spark.sql("USE `CATALOG_NAME`;")
    spark.sql("CREATE NAMESPACE IF NOT EXISTS NAMESPACE_NAME;")
    spark.sql("USE NAMESPACE_NAME;")
    spark.sql("CREATE TABLE TABLE_NAME (id int, data string) USING ICEBERG;")
    spark.sql("DESCRIBE TABLE_NAME;").show()

    Reemplaza lo siguiente:

    • CATALOG_NAME: Es el nombre del catálogo de Spark que se usará para la tarea de SQL.
    • PROJECT_ID: Es el ID del proyecto de Google Cloud que contiene el clúster de Dataproc.
    • WAREHOUSE_DIRECTORY: Es la carpeta de Cloud Storage que contiene tu almacén de datos. Este valor comienza con gs://.
    • NAMESPACE_NAME: Es el nombre del espacio de nombres que hace referencia a tu tabla de Spark.
    • WAREHOUSE_DIRECTORY: Es el URI de la carpeta de Cloud Storage en la que se almacena tu almacén de datos.
    • TABLE_NAME: Es un nombre de tabla para tu tabla de Spark.
  15. Haz clic en el ícono de ejecución o presiona Shift-Enter para ejecutar el código. Cuando se completa el trabajo, el mensaje de estado muestra "Spark Job Finished" y el resultado muestra el contenido de la tabla:

En las siguientes instrucciones, se muestra cómo conectar Dataproc al metastore de BigQuery con el cliente de SQL de Flink.

Para conectar el metastore de BigQuery a Flink, haz lo siguiente:

  1. Crea un clúster de Dataproc con el componente opcional de Flink habilitado y asegúrate de usar Dataproc 2.2 o una versión posterior.
  2. En la consola de Google Cloud, ve a la página Instancias de VM.

    Ir a Instancias de VM

  3. En la lista de instancias de máquinas virtuales, haz clic en SSH para conectarte a una instancia de VM de Dataproc.

  4. Configura el complemento de catálogo personalizado de Iceberg para el metastore de BigQuery:

    FLINK_VERSION=1.17
    ICEBERG_VERSION=1.5.2
    
    cd /usr/lib/flink
    
    sudo wget -c https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-${FLINK_VERSION}/${ICEBERG_VERSION}/iceberg-flink-runtime-${FLINK_VERSION}-${ICEBERG_VERSION}.jar -P lib
    
    sudo gcloud storage cp gs://spark-lib/bigquery/iceberg-bigquery-catalog-${ICEBERG_VERSION}-1.0.1-beta.jar lib/
  5. Inicia la sesión de Flink en YARN:

    HADOOP_CLASSPATH=`hadoop classpath`
    
    sudo bin/yarn-session.sh -nm flink-dataproc -d
    
    sudo bin/sql-client.sh embedded \
      -s yarn-session
  6. Crea un catálogo en Flink:

    CREATE CATALOG CATALOG_NAME WITH (
      'type'='iceberg',
      'warehouse'='WAREHOUSE_DIRECTORY',
      'catalog-impl'='org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog',
      'gcp_project'='PROJECT_ID',
      'gcp_location'='LOCATION'
    );

    Reemplaza lo siguiente:

    • CATALOG_NAME: Es el identificador del catálogo de Flink, que está vinculado a un catálogo de metastore de BigQuery.
    • WAREHOUSE_DIRECTORY: Es la ruta de acceso base del directorio del almacén (la carpeta de Cloud Storage en la que Flink crea archivos). Este valor comienza con gs://.
    • PROJECT_ID: Es el ID del proyecto del catálogo de metastore de BigQuery con el que se vincula el catálogo de Flink.
    • LOCATION: La ubicación de los recursos de BigQuery.

Tu sesión de Flink ahora está conectada al metastore de BigQuery, y puedes ejecutar comandos SQL de Flink.

Ahora que te conectaste al metastore de BigQuery, puedes crear y ver recursos según los metadatos almacenados en el metastore de BigQuery.

Por ejemplo, intenta ejecutar los siguientes comandos en tu sesión interactiva de SQL de Flink para crear una base de datos y una tabla de Iceberg.

  1. Usa el catálogo de Iceberg personalizado:

    USE CATALOG CATALOG_NAME;

    Reemplaza CATALOG_NAME por el identificador de tu catálogo de Flink.

  2. Crea una base de datos, que crea un conjunto de datos en BigQuery:

    CREATE DATABASE IF NOT EXISTS DATABASE_NAME;

    Reemplaza DATABASE_NAME por el nombre de tu base de datos nueva.

  3. Usa la base de datos que creaste:

    USE DATABASE_NAME;
  4. Crea una tabla de Iceberg. El siguiente comando crea una tabla de ventas de ejemplo:

    CREATE TABLE IF NOT EXISTS ICEBERG_TABLE_NAME (
        order_number BIGINT,
        price        DECIMAL(32,2),
        buyer        ROW<first_name STRING, last_name STRING>,
        order_time   TIMESTAMP(3)
    );

    Reemplaza ICEBERG_TABLE_NAME por un nombre para tu tabla nueva.

  5. Visualiza los metadatos de la tabla:

    DESCRIBE EXTENDED ICEBERG_TABLE_NAME;
  6. Haz una lista de las tablas de la base de datos:

    SHOW TABLES;

Transfiere datos a tu tabla

Después de crear una tabla Iceberg en la sección anterior, puedes usar Flink DataGen como fuente de datos para transferir datos en tiempo real a tu tabla. Los siguientes pasos son un ejemplo de este flujo de trabajo:

  1. Crea una tabla temporal con DataGen:

    CREATE TEMPORARY TABLE DATABASE_NAME.TEMP_TABLE_NAME
    WITH (
        'connector' = 'datagen',
        'rows-per-second' = '10',
        'fields.order_number.kind' = 'sequence',
        'fields.order_number.start' = '1',
        'fields.order_number.end' = '1000000',
        'fields.price.min' = '0',
        'fields.price.max' = '10000',
        'fields.buyer.first_name.length' = '10',
        'fields.buyer.last_name.length' = '10'
    )
    LIKE DATABASE_NAME.ICEBERG_TABLE_NAME (EXCLUDING ALL);

    Reemplaza lo siguiente:

    • DATABASE_NAME: Es el nombre de la base de datos en la que se almacenará tu tabla temporal.
    • TEMP_TABLE_NAME: Es un nombre para tu tabla temporal.
    • ICEBERG_TABLE_NAME: Es el nombre de la tabla de Iceberg que creaste en la sección anterior.
  2. Establece el paralelismo en 1:

    SET 'parallelism.default' = '1';
  3. Establece el intervalo de punto de control:

    SET 'execution.checkpointing.interval' = '10second';
  4. Configura el punto de control:

    SET 'state.checkpoints.dir' = 'hdfs:///flink/checkpoints';
  5. Inicia el trabajo de transmisión en tiempo real:

    INSERT INTO ICEBERG_TABLE_NAME SELECT * FROM TEMP_TABLE_NAME;

    El resultado es similar a este:

    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 0de23327237ad8a811d37748acd9c10b
    
  6. Para verificar el estado de la tarea de transmisión, haz lo siguiente:

    1. En la consola de Google Cloud, ve a la página Clústeres.

      Ir a los clústeres

    2. Selecciona tu clúster.

    3. Haz clic en la pestaña Interfaces web.

    4. Haz clic en el vínculo YARN ResourceManager.

    5. En la interfaz de YARN ResourceManager, busca tu sesión de Flink y haz clic en el vínculo ApplicationMaster en Tracking UI.

    6. En la columna Estado, confirma que el estado de tu trabajo sea En ejecución.

  7. Consulta datos de transmisión en el cliente de Flink SQL:

    SELECT * FROM ICEBERG_TABLE_NAME
    /*+ OPTIONS('streaming'='true', 'monitor-interval'='3s')*/
    ORDER BY order_time desc
    LIMIT 20;
  8. Consulta datos de transmisión en BigQuery:

    SELECT * FROM `DATABASE_NAME.ICEBERG_TABLE_NAME`
    ORDER BY order_time desc
    LIMIT 20;
  9. Finaliza el trabajo de transmisión en el cliente de SQL de Flink:

    STOP JOB 'JOB_ID';

    Reemplaza JOB_ID por el ID de trabajo que se mostró en el resultado cuando creaste el trabajo de transmisión.

¿Qué sigue?