Usa el conector de Bigtable Spark

El conector de Bigtable para Spark te permite leer y escribir datos en Bigtable. Puedes leer datos desde tu aplicación Spark con Spark SQL y DataFrames. Se admiten las siguientes operaciones de Bigtable con el conector de Bigtable Spark:

  • Escribe datos
  • Lee datos
  • Crea una tabla nueva

En este documento, se muestra cómo convertir una tabla de DataFrames de Spark SQL en una tabla de Bigtable y, luego, compilar y crear un archivo JAR para enviar un trabajo de Spark.

Estado de compatibilidad con Spark y Scala

El conector de Bigtable para Spark admite las siguientes versiones de Scala:

El conector de Bigtable Spark admite las siguientes versiones de Spark:

El conector de Bigtable Spark admite las siguientes versiones de Dataproc:

Calcula los costos

Si decides usar cualquiera de los siguientes componentes facturables de Google Cloud, se te facturarán los recursos que uses:

  • Bigtable (no se te cobra por usar el emulador de Bigtable)
  • Dataproc
  • Cloud Storage

Los precios de Dataproc se aplican al uso de clústeres de Dataproc en Compute Engine. Los precios de Dataproc Serverless se aplican a las cargas de trabajo y las sesiones que se ejecutan en Dataproc Serverless para Spark.

Para generar una estimación de costos en función del uso previsto, usa la calculadora de precios.

Antes de comenzar

Completa los siguientes requisitos previos antes de usar el conector de Bigtable Spark.

Roles obligatorios

Para obtener los permisos que necesitas para usar el conector de Bigtable para Spark, pídele a tu administrador que te otorgue los siguientes roles de IAM en tu proyecto:

  • Administrador de Bigtable (roles/bigtable.admin)(opcional): Te permite leer o escribir datos y crear una tabla nueva.
  • Usuario de Bigtable (roles/bigtable.user): Te permite leer o escribir datos, pero no crear una tabla nueva.

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Si usas Dataproc o Cloud Storage, es posible que se requieran permisos adicionales. Para obtener más información, consulta los permisos de Dataproc y Cloud Storage.

Configura Spark

Además de crear una instancia de Bigtable, también debes configurar tu instancia de Spark. Puedes hacerlo de forma local o seleccionar cualquiera de estas opciones para usar Spark con Dataproc:

  • Clúster de Dataproc
  • Dataproc sin servidores

Para obtener más información sobre cómo elegir entre un clúster de Dataproc o una opción sin servidores, consulta la documentación de Comparación entre Dataproc sin servidores para Spark y Dataproc en Compute Engine .

Descarga el archivo JAR del conector

Puedes encontrar el código fuente del conector de Bigtable Spark con ejemplos en el repositorio de GitHub del conector de Bigtable Spark.

Según tu configuración de Spark, puedes acceder al archivo JAR de la siguiente manera:

  • Si ejecutas PySpark de forma local, debes descargar el archivo JAR del conector desde la ubicación de Cloud Storage gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.

    Reemplaza SCALA_VERSION por 2.12 o 2.13, que son las únicas versiones de Scala compatibles, y reemplaza CONNECTOR_VERSION por la versión del conector que deseas usar.

  • Para la opción de clúster o sin servidores de Dataproc, usa el archivo JAR más reciente como un artefacto que se puede agregar en tus aplicaciones de Spark en Scala o Java. Para obtener más información sobre el uso del archivo JAR como artefacto, consulta Administra dependencias.

  • Si envías tu trabajo de PySpark a Dataproc, usa la marca gcloud dataproc jobs submit pyspark --jars para establecer el URI en la ubicación del archivo JAR en Cloud Storage, por ejemplo, gs://spark-lib/bigtable/spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.

Cómo determinar el tipo de procesamiento

Para los trabajos de solo lectura, puedes usar la computación sin servidores de Data Boost, que te permite evitar el impacto en los clústeres que publican tu aplicación. Tu aplicación de Spark debe usar la versión 1.1.0 o posterior del conector de Spark para usar Data Boost.

Para usar Data Boost, debes crear un perfil de app de Data Boost y, luego, proporcionar el ID del perfil de app para la opción de Spark spark.bigtable.app_profile.id cuando agregues tu configuración de Bigtable a tu aplicación de Spark. Si ya creaste un perfil de app para tus trabajos de lectura de Spark y quieres seguir usándolo sin cambiar el código de tu aplicación, puedes convertir el perfil de app en un perfil de app de Data Boost. Para obtener más información, consulta Cómo convertir un perfil de app.

Para obtener más información, consulta la descripción general de Bigtable Data Boost.

En el caso de los trabajos que implican lecturas y escrituras, puedes usar los nodos del clúster de tu instancia para el procesamiento. Para ello, especifica un perfil de aplicación estándar con tu solicitud.

Identifica o crea un perfil de app para usar

Si no especificas un ID de perfil de app, el conector usa el perfil de app predeterminado.

Te recomendamos que uses un perfil de aplicación único para cada aplicación que ejecutes, incluida tu aplicación de Spark. Si deseas obtener más información sobre los tipos y la configuración de los perfiles de aplicación, consulta la Descripción general de los perfiles de aplicación. Para obtener instrucciones, consulta Crea y configura perfiles de app.

Agrega la configuración de Bigtable a tu aplicación de Spark

En tu aplicación de Spark, agrega las opciones de Spark que te permiten interactuar con Bigtable.

Opciones de Spark compatibles

Usa las opciones de Spark que están disponibles como parte del paquete com.google.cloud.spark.bigtable.

Nombre de la opción Obligatorio Valor predeterminado Significado
spark.bigtable.project.id N/A Configura el ID del proyecto de Bigtable.
spark.bigtable.instance.id N/A Establece el ID de la instancia de Bigtable.
catalog N/A Establece el formato JSON que especifica el formato de conversión entre el esquema similar a SQL del DataFrame y el esquema de la tabla de Bigtable.

Consulta Crea metadatos de tablas en formato JSON para obtener más información.
spark.bigtable.app_profile.id No default Establece el ID del perfil de la aplicación de Bigtable.
spark.bigtable.write.timestamp.milliseconds No Hora actual del sistema Establece la marca de tiempo en milisegundos que se usará cuando se escriba un DataFrame en Bigtable.

Ten en cuenta que, dado que todas las filas del DataFrame usan la misma marca de tiempo, las filas con la misma columna de clave de fila en el DataFrame persisten como una sola versión en Bigtable, ya que comparten la misma marca de tiempo.
spark.bigtable.create.new.table No false Se establece en true para crear una tabla nueva antes de escribir en Bigtable.
spark.bigtable.read.timerange.start.milliseconds o spark.bigtable.read.timerange.end.milliseconds No N/A Establece marcas de tiempo (en milisegundos desde la hora de época) para filtrar celdas con una fecha de inicio y una fecha de finalización específicas, respectivamente.
spark.bigtable.push.down.row.key.filters No true Se establece en true para permitir el filtrado simple de clave de fila en el servidor. El filtrado en claves de fila compuestas se implementa del lado del cliente.

Consulta Cómo leer una fila específica de DataFrame con un filtro para obtener más información.
spark.bigtable.read.rows.attempt.timeout.milliseconds No 30 min Establece la duración del tiempo de espera para un intento de lectura de filas correspondiente a una partición de DataFrame en el cliente de Bigtable para Java.
spark.bigtable.read.rows.total.timeout.milliseconds No 12 h Establece la duración del tiempo de espera total para un intento de lectura de filas correspondiente a una partición de DataFrame en el cliente de Bigtable para Java.
spark.bigtable.mutate.rows.attempt.timeout.milliseconds No 1 min Establece la duración del tiempo de espera para un intento de mutar filas correspondiente a una partición de DataFrame en el cliente de Bigtable para Java.
spark.bigtable.mutate.rows.total.timeout.milliseconds No 10 min Establece la duración del tiempo de espera total para un intento de mutar filas correspondiente a una partición de DataFrame en el cliente de Bigtable para Java.
spark.bigtable.batch.mutate.size No 100 Se establece en la cantidad de mutaciones en cada lote. El valor máximo que puedes establecer es 100000.
spark.bigtable.enable.batch_mutate.flow_control No false Establécelo en true para habilitar el control de flujo para las mutaciones por lotes.

Crea metadatos de tablas en formato JSON

El formato de tabla de DataFrames de Spark SQL debe convertirse en una tabla de Bigtable con una cadena en formato JSON. Este formato JSON de cadena hace que el formato de datos sea compatible con Bigtable. Puedes pasar el formato JSON en el código de tu aplicación con la opción .option("catalog", catalog_json_string).

Como ejemplo, considera la siguiente tabla de DataFrame y la tabla de Bigtable correspondiente.

En este ejemplo, las columnas name y birthYear del DataFrame se agrupan en la familia de columnas info y se les cambia el nombre a name y birth_year, respectivamente. Del mismo modo, la columna address se almacena en la familia de columnas location con el mismo nombre de columna. La columna id del DataFrame se convierte en la clave de fila de Bigtable.

Las claves de fila no tienen un nombre de columna dedicado en Bigtable y, en este ejemplo, id_rowkey solo se usa para indicarle al conector que esta es la columna de clave de fila. Puedes usar cualquier nombre para la columna de clave de fila y asegurarte de usar el mismo nombre cuando declares el campo "rowkey":"column_name" en el formato JSON.

DataFrame Tabla de Bigtable = t1
Columnas Clave de fila Familias de columnas
información ubicación
Columnas Columnas
id name birthYear address id_rowkey name birth_year address

El formato JSON del catálogo es el siguiente:

    """
    {
      "table": {"name": "t1"},
      "rowkey": "id_rowkey",
      "columns": {
        "id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"},
        "name": {"cf": "info", "col": "name", "type": "string"},
        "birthYear": {"cf": "info", "col": "birth_year", "type": "long"},
        "address": {"cf": "location", "col": "address", "type": "string"}
      }
    }
    """

Las claves y los valores que se usan en el formato JSON son los siguientes:

Clave del catálogo Valor del catálogo Formato JSON
tabla Es el nombre de la tabla de Bigtable. "table":{"name":"t1"}

Si la tabla no existe, usa .option("spark.bigtable.create.new.table", "true") para crearla.
rowkey Nombre de la columna que se usará como clave de fila de Bigtable. Asegúrate de que el nombre de la columna del DataFrame se use como la clave de fila, por ejemplo, id_rowkey.

También se aceptan claves compuestas como claves de fila. Por ejemplo, "rowkey":"name:address". Este enfoque puede generar claves de filas que requieran un análisis completo de la tabla para todas las solicitudes de lectura.
"rowkey":"id_rowkey",
columnas Es la asignación de cada columna del DataFrame a la familia de columnas ("cf") y al nombre de la columna ("col") correspondientes de Bigtable. El nombre de la columna puede ser diferente del nombre de la columna en la tabla del DataFrame. Los tipos de datos admitidos incluyen string, long y binary. "columns": {"id": {"cf": "rowkey", "col": "id_rowkey", "type": "string"}, "name": {"cf": "info", "col": "name", "type": "string"}, "birthYear": {"cf":"info", "col": "birth_year", "type": "long"}, "address": {"cf": "location", "col": "address", "type":"string"}}"

En este ejemplo, id_rowkey es la clave de fila, y info y location son las familias de columnas.

Tipos de datos admitidos

El conector admite el uso de los tipos string, long y binary (array de bytes) en el catálogo. Hasta que se agregue compatibilidad con otros tipos, como int y float, puedes convertir manualmente esos tipos de datos en arrays de bytes (BinaryType de Spark SQL) antes de usar el conector para escribirlos en Bigtable.

Además, puedes usar Avro para serializar tipos complejos, como ArrayType. Para obtener más información, consulta Serializa tipos de datos complejos con Apache Avro.

Escribe en Bigtable

Usa la función .write() y las opciones admitidas para escribir tus datos en Bigtable.

Java

El siguiente código del repositorio de GitHub usa Java y Maven para escribir en Bigtable.

  String catalog = "{" +
        "\"table\":{\"name\":\"" + tableName + "\"," +
        "\"tableCoder\":\"PrimitiveType\"}," +
        "\"rowkey\":\"wordCol\"," +
        "\"columns\":{" +
        "\"word\":{\"cf\":\"rowkey\", \"col\":\"wordCol\", \"type\":\"string\"}," +
        "\"count\":{\"cf\":\"example_family\", \"col\":\"countCol\", \"type\":\"long\"}" +
        "}}".replaceAll("\\s+", "");



  private static void writeDataframeToBigtable(Dataset<Row> dataframe, String catalog,
        String createNewTable) {
      dataframe
          .write()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .option("spark.bigtable.create.new.table", createNewTable)
          .save();
    }

Python

El siguiente código del repositorio de GitHub usa Python para escribir en Bigtable.

  catalog = ''.join(("""{
        "table":{"name":" """ + bigtable_table_name + """
        ", "tableCoder":"PrimitiveType"},
        "rowkey":"wordCol",
        "columns":{
          "word":{"cf":"rowkey", "col":"wordCol", "type":"string"},
          "count":{"cf":"example_family", "col":"countCol", "type":"long"}
        }
        }""").split())
  

  input_data = spark.createDataFrame(data)
  print('Created the DataFrame:')
  input_data.show()

  input_data.write \
        .format('bigtable') \
        .options(catalog=catalog) \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .option('spark.bigtable.create.new.table', create_new_table) \
        .save()
  print('DataFrame was written to Bigtable.')

  

Lee desde Bigtable

Usa la función .read() para verificar si la tabla se importó correctamente a Bigtable.

Java

  
  private static Dataset<Row> readDataframeFromBigtable(String catalog) {
      Dataset<Row> dataframe = spark
          .read()
          .format("bigtable")
          .option("catalog", catalog)
          .option("spark.bigtable.project.id", projectId)
          .option("spark.bigtable.instance.id", instanceId)
          .load();
      return dataframe;
    }

Python

  

  records = spark.read \
        .format('bigtable') \
        .option('spark.bigtable.project.id', bigtable_project_id) \
        .option('spark.bigtable.instance.id', bigtable_instance_id) \
        .options(catalog=catalog) \
        .load()

  print('Reading the DataFrame from Bigtable:')
  records.show()

Compila tu proyecto

Genera el archivo JAR que se usa para ejecutar un trabajo en un clúster de Dataproc, Dataproc Serverless o una instancia local de Spark. Puedes compilar el archivo JAR de forma local y, luego, usarlo para enviar un trabajo. La ruta de acceso al archivo JAR compilado se establece como la variable de entorno PATH_TO_COMPILED_JAR cuando envías un trabajo.

Este paso no se aplica a las aplicaciones de PySpark.

Administrar dependencias

El conector de Bigtable Spark admite las siguientes herramientas de administración de dependencias:

Compila el archivo JAR

Maven

  1. Agrega la dependencia spark-bigtable al archivo pom.xml.

    <dependencies>
    <dependency>
      <groupId>com.google.cloud.spark.bigtable</groupId>
      <artifactId>spark-bigtable_SCALA_VERSION</artifactId>
      <version>0.1.0</version>
    </dependency>
    </dependencies>
    
  2. Agrega el complemento Maven Shade a tu archivo pom.xml para crear un archivo uber JAR:

    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.4</version>
        <executions>
          <execution>
            <phase>package</phase>
            <goals>
              <goal>shade</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
    
  3. Ejecuta el comando mvn clean install para generar un archivo JAR.

sbt

  1. Agrega la dependencia spark-bigtable a tu archivo build.sbt:

    libraryDependencies += "com.google.cloud.spark.bigtable" % "spark-bigtable_SCALA_VERSION" % "0.1.0{""}}"
  2. Agrega el complemento sbt-assembly a tu archivo project/plugins.sbt o project/assembly.sbt para crear un archivo Uber JAR.

    addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
  3. Ejecuta el comando sbt clean assembly para generar el archivo JAR.

Gradle

  1. Agrega la dependencia spark-bigtable al archivo build.gradle.

    dependencies {
    implementation group: 'com.google.cloud.bigtable', name: 'spark-bigtable_SCALA_VERSION', version: '0.1.0'
    }
  2. Agrega el complemento Shadow en tu archivo build.gradle para crear un archivo uber JAR:

    plugins {
    id 'com.github.johnrengelman.shadow' version '8.1.1'
    id 'java'
    }
  3. Consulta la documentación del complemento Shadow para obtener más información sobre la configuración y la compilación de JAR.

Envía un trabajo

Envía un trabajo de Spark con Dataproc, Dataproc Serverless o una instancia local de Spark para iniciar tu aplicación.

Configura el entorno de ejecución

Configura las siguientes variables de entorno:

      #Google Cloud
      export BIGTABLE_SPARK_PROJECT_ID=PROJECT_ID
      export BIGTABLE_SPARK_INSTANCE_ID=INSTANCE_ID
      export BIGTABLE_SPARK_TABLE_NAME=TABLE_NAME
      export BIGTABLE_SPARK_DATAPROC_CLUSTER=DATAPROC_CLUSTER
      export BIGTABLE_SPARK_DATAPROC_REGION=DATAPROC_REGION
      export BIGTABLE_SPARK_DATAPROC_ZONE=DATAPROC_ZONE

      #Dataproc Serverless
      export BIGTABLE_SPARK_SUBNET=SUBNET
      export BIGTABLE_SPARK_GCS_BUCKET_NAME=GCS_BUCKET_NAME

      #Scala/Java
      export PATH_TO_COMPILED_JAR=PATH_TO_COMPILED_JAR

      #PySpark
      export GCS_PATH_TO_CONNECTOR_JAR=GCS_PATH_TO_CONNECTOR_JAR
      export PATH_TO_PYTHON_FILE=PATH_TO_PYTHON_FILE
      export LOCAL_PATH_TO_CONNECTOR_JAR=LOCAL_PATH_TO_CONNECTOR_JAR

Reemplaza lo siguiente:

  • PROJECT_ID: Es el identificador permanente del proyecto de Bigtable.
  • INSTANCE_ID: Es el identificador permanente de la instancia de Bigtable.
  • TABLE_NAME: Es el identificador permanente de la tabla.
  • DATAPROC_CLUSTER: Es el identificador permanente del clúster de Dataproc.
  • DATAPROC_REGION: Es la región de Dataproc que contiene uno de los clústeres en tu instancia de Dataproc, por ejemplo, northamerica-northeast2.
  • DATAPROC_ZONE: Es la zona en la que se ejecuta el clúster de Dataproc.
  • SUBNET: Es la ruta de acceso completa del recurso de la subred.
  • GCS_BUCKET_NAME: Es el bucket de Cloud Storage en el que se subirán las dependencias de la carga de trabajo de Spark.
  • PATH_TO_COMPILED_JAR: Es la ruta de acceso completa o relativa al archivo JAR compilado, por ejemplo, /path/to/project/root/target/<compiled_JAR_name> para Maven.
  • GCS_PATH_TO_CONNECTOR_JAR: Es el bucket de Cloud Storage gs://spark-lib/bigtable en el que se encuentra el archivo spark-bigtable_SCALA_VERSION-CONNECTOR_VERSION.jar.
  • PATH_TO_PYTHON_FILE: En el caso de las aplicaciones de PySpark, es la ruta de acceso al archivo de Python que se usará para escribir datos en Bigtable y leerlos desde allí.
  • LOCAL_PATH_TO_CONNECTOR_JAR: Para las aplicaciones de PySpark, es la ruta de acceso al archivo JAR del conector de Bigtable para Spark descargado.

Enviar un trabajo de Spark

En el caso de las instancias de Dataproc o tu configuración local de Spark, ejecuta un trabajo de Spark para subir datos a Bigtable.

Clúster de Dataproc

Usa el archivo JAR compilado y crea un trabajo del clúster de Dataproc que lea y escriba datos desde y hacia Bigtable.

  1. Crea un clúster de Dataproc. En el siguiente ejemplo, se muestra un comando de muestra para crear un clúster de Dataproc v2.0 con Debian 10, dos nodos trabajadores y configuraciones predeterminadas.

    gcloud dataproc clusters create \
      $BIGTABLE_SPARK_DATAPROC_CLUSTER --region $BIGTABLE_SPARK_DATAPROC_REGION \
      --zone $BIGTABLE_SPARK_DATAPROC_ZONE \
      --master-machine-type n2-standard-4 --master-boot-disk-size 500 \
      --num-workers 2 --worker-machine-type n2-standard-4 --worker-boot-disk-size 500 \
      --image-version 2.0-debian10 --project $BIGTABLE_SPARK_PROJECT_ID
    
  2. Enviar un trabajo

    Scala/Java

    En el siguiente ejemplo, se muestra la clase spark.bigtable.example.WordCount que incluye la lógica para crear una tabla de prueba en DataFrame, escribir la tabla en Bigtable y, luego, contar la cantidad de palabras en la tabla.

        gcloud dataproc jobs submit spark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --class=spark.bigtable.example.WordCount \
        --jar=$PATH_TO_COMPILED_JAR \
        -- \
        $BIGTABLE_SPARK_PROJECT_ID \
        $BIGTABLE_SPARK_INSTANCE_ID \
        $BIGTABLE_SPARK_TABLE_NAME \
    

    PySpark

        gcloud dataproc jobs submit pyspark \
        --cluster=$BIGTABLE_SPARK_DATAPROC_CLUSTER \
        --region=$BIGTABLE_SPARK_DATAPROC_REGION \
        --jars=$GCS_PATH_TO_CONNECTOR_JAR \
        --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
        $PATH_TO_PYTHON_FILE \
        -- \
        --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
        --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
        --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME \
    

Dataproc sin servidores

Usa el archivo JAR compilado y crea un trabajo de Dataproc que lea y escriba datos desde y hacia Bigtable con una instancia de Dataproc Serverless.

Scala/Java

  gcloud dataproc batches submit spark \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME --jar=$PATH_TO_COMPILED_JAR \
  --  \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  gcloud dataproc batches submit pyspark $PATH_TO_PYTHON_FILE \
  --region=$BIGTABLE_SPARK_DATAPROC_REGION \
  --subnet=$BIGTABLE_SPARK_SUBNET --version=1.1 \
  --deps-bucket=gs://$BIGTABLE_SPARK_GCS_BUCKET_NAME \
  --jars=$GCS_PATH_TO_CONNECTOR_JAR \
  --properties='spark.jars.packages=org.slf4j:slf4j-reload4j:1.7.36' \
  -- \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Spark local

Usa el archivo JAR descargado y crea un trabajo de Spark que lea y escriba datos desde y hacia Bigtable con una instancia local de Spark. También puedes usar el emulador de Bigtable para enviar el trabajo de Spark.

Usa el emulador de Bigtable

Si decides usar el emulador de Bigtable, sigue estos pasos:

  1. Ejecuta el siguiente comando para iniciar el emulador

    gcloud beta emulators bigtable start
    

    De forma predeterminada, el emulador elige localhost:8086.

  2. Establece la variable de entorno BIGTABLE_EMULATOR_HOST:

    export BIGTABLE_EMULATOR_HOST=localhost:8086
    
  3. Envía el trabajo de Spark.

Para obtener más información sobre cómo usar el emulador de Bigtable, consulta Cómo realizar pruebas con el emulador.

Enviar un trabajo de Spark

Usa el comando spark-submit para enviar un trabajo de Spark, independientemente de si usas un emulador de Bigtable local.

Scala/Java

  spark-submit $PATH_TO_COMPILED_JAR \
  $BIGTABLE_SPARK_PROJECT_ID \
  $BIGTABLE_SPARK_INSTANCE_ID \
  $BIGTABLE_SPARK_TABLE_NAME

PySpark

  spark-submit \
  --jars=$LOCAL_PATH_TO_CONNECTOR_JAR \
  --packages=org.slf4j:slf4j-reload4j:1.7.36 \
  $PATH_TO_PYTHON_FILE \
  --bigtableProjectId=$BIGTABLE_SPARK_PROJECT_ID \
  --bigtableInstanceId=$BIGTABLE_SPARK_INSTANCE_ID \
  --bigtableTableName=$BIGTABLE_SPARK_TABLE_NAME

Verifica los datos de la tabla

Ejecuta el siguiente comando de la CLI de cbt para verificar que los datos se escriban en Bigtable. La CLI de cbt es un componente de Google Cloud CLI. Para obtener más información, consulta la descripción general de la CLI de cbt.

    cbt -project=$BIGTABLE_SPARK_PROJECT_ID -instance=$BIGTABLE_SPARK_INSTANCE_ID \
    read $BIGTABLE_SPARK_TABLE_NAME

Soluciones adicionales

Usa el conector de Bigtable Spark para soluciones específicas, como la serialización de tipos complejos de Spark SQL, la lectura de filas específicas y la generación de métricas del cliente.

Cómo leer una fila específica de DataFrame con un filtro

Cuando usas DataFrames para leer datos de Bigtable, puedes especificar un filtro para leer solo filas específicas. Los filtros simples, como ==, <= y startsWith en la columna de clave de fila, se aplican en el servidor para evitar un análisis completo de la tabla. Los filtros en claves de fila compuestas o los filtros complejos, como el filtro LIKE en la columna de clave de fila, se aplican en el cliente.

Si lees tablas grandes, te recomendamos que uses filtros de clave de fila simples para evitar realizar un análisis completo de la tabla. En la siguiente instrucción de ejemplo, se muestra cómo leer con un filtro simple. Asegúrate de que, en tu filtro de Spark, uses el nombre de la columna del DataFrame que se convierte en la clave de fila:

    dataframe.filter("id == 'some_id'").show()
  

Cuando apliques un filtro, usa el nombre de la columna del DataFrame en lugar del nombre de la columna de la tabla de Bigtable.

Serializa tipos de datos complejos con Apache Avro

El conector de Bigtable para Spark proporciona compatibilidad para usar Apache Avro para serializar tipos de Spark SQL complejos, como ArrayType, MapType o StructType. Apache Avro proporciona serialización de datos para los datos de registros que se usan comúnmente para procesar y almacenar estructuras de datos complejas.

Usa una sintaxis como "avro":"avroSchema" para especificar que una columna de Bigtable se debe codificar con Avro. Luego, puedes usar .option("avroSchema", avroSchemaString) cuando leas o escribas en Bigtable para especificar el esquema Avro correspondiente a esa columna en formato de cadena. Puedes usar diferentes nombres de opciones, por ejemplo, "anotherAvroSchema" para diferentes columnas y pasar esquemas de Avro para varias columnas.

def catalogWithAvroColumn = s"""{
                    |"table":{"name":"ExampleAvroTable"},
                    |"rowkey":"key",
                    |"columns":{
                    |"col0":{"cf":"rowkey", "col":"key", "type":"string"},
                    |"col1":{"cf":"cf1", "col":"col1", "avro":"avroSchema"}
                    |}
                    |}""".stripMargin

Usa métricas del cliente

Dado que el conector de Bigtable Spark se basa en el cliente de Bigtable para Java, las métricas del cliente están habilitadas de forma predeterminada dentro del conector. Puedes consultar la documentación sobre las métricas del cliente para obtener más detalles sobre cómo acceder a estas métricas y cómo interpretarlas.

Usa el cliente de Bigtable para Java con funciones de RDD de bajo nivel

Dado que el conector de Bigtable para Spark se basa en el cliente de Bigtable para Java, puedes usar el cliente directamente en tus aplicaciones de Spark y realizar solicitudes de lectura o escritura distribuidas dentro de las funciones de RDD de bajo nivel, como mapPartitions y foreachPartition.

Para usar las clases del cliente de Bigtable para Java, agrega el prefijo com.google.cloud.spark.bigtable.repackaged a los nombres de los paquetes. Por ejemplo, en lugar de usar el nombre de la clase como com.google.cloud.bigtable.data.v2.BigtableDataClient, usa com.google.cloud.spark.bigtable.repackaged.com.google.cloud.bigtable.data.v2.BigtableDataClient.

Para obtener más información sobre el cliente de Bigtable para Java, consulta el cliente de Bigtable para Java.

¿Qué sigue?