Escribir una tarea de MapReduce con el conector de BigQuery

El conector Hadoop BigQuery está instalado de forma predeterminada en todos los nodos de clúster de Dataproc 1.0-1.2 en /usr/lib/hadoop/lib/. Está disponible en entornos de Spark y PySpark.

Versiones de imagen de Dataproc 1.5 y posteriores: el conector de BigQuery no está instalado de forma predeterminada en las versiones de imagen 1.5 y posteriores de Dataproc. Para usarlo con estas versiones, sigue estos pasos:

  1. Instala el conector de BigQuery mediante esta acción de inicialización.

  2. Especifica el conector de BigQuery en el parámetro jars al enviar un trabajo:

    --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar

  3. Incluye las clases del conector de BigQuery en el archivo jar-with-dependencies de la aplicación.

Para evitar conflictos: si tu aplicación usa una versión de conector diferente de la versión de conector desplegada en tu clúster de Dataproc, debes hacer lo siguiente:

  1. Crea un clúster con una acción de inicialización que instale la versión del conector que usa tu aplicación.

  2. Incluye y reubica las clases de conectores y las dependencias de conectores de la versión que estés usando en el archivo JAR de tu aplicación para evitar conflictos entre la versión del conector y la versión del conector implementada en tu clúster de Dataproc (consulta este ejemplo de reubicación de dependencias en Maven).

Clase GsonBigQueryInputFormat

GsonBigQueryInputFormat proporciona a Hadoop los objetos de BigQuery en formato JsonObject mediante las siguientes operaciones principales:

  • Usar una consulta especificada por el usuario para seleccionar objetos de BigQuery
  • Dividir los resultados de la consulta de forma equitativa entre los nodos de Hadoop
  • Analiza las divisiones en objetos Java para pasarlas al Mapper. La clase Mapper de Hadoop recibe una representación JsonObject de cada objeto de BigQuery seleccionado.

La clase BigQueryInputFormat proporciona acceso a los registros de BigQuery a través de una extensión de la clase InputFormat de Hadoop. Para usar la clase BigQueryInputFormat, haz lo siguiente:

  1. Se deben añadir líneas a la tarea principal de Hadoop para definir los parámetros en la configuración de Hadoop.

  2. La clase InputFormat debe tener el valor GsonBigQueryInputFormat.

En las secciones que se indican a continuación, se explica cómo cumplir estos requisitos.

Parámetros de entrada

QualifiedInputTableId
Tabla de BigQuery de la que se leerán los datos, con el siguiente formato: optional-projectId:datasetId.tableId
Ejemplo: publicdata:samples.shakespeare
ID del proyecto
El ID de proyecto de BigQuery en el que se producen todas las operaciones de entrada.
Ejemplo: my-first-cloud-project
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

// Configure input parameters.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

// Set InputFormat.
job.setInputFormatClass(GsonBigQueryInputFormat.class);

Notas:

  • job hace referencia a org.apache.hadoop.mapreduce.Job, la tarea de Hadoop que se va a ejecutar.
  • conf hace referencia al org.apache.hadoop.Configuration de la tarea de Hadoop.

Mapper

La clase GsonBigQueryInputFormat lee datos de BigQuery y transfiere objetos de BigQuery de uno en uno como entrada a la función Mapper de Hadoop. Las entradas tienen la forma de un par que incluye lo siguiente:

  • LongWritable, el número de registro
  • JsonObject, el registro de BigQuery con formato JSON

Mapper acepta LongWritable y JsonObject pair como entrada.

A continuación, se muestra un fragmento de Mapper para una tarea de recuento de palabras de ejemplo.

  // private static final LongWritable ONE = new LongWritable(1);
  // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  /**
   * The mapper function for WordCount.
   */
  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for
      // in the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY,
          WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

Clase IndirectBigQueryOutputFormat

IndirectBigQueryOutputFormat proporciona a Hadoop la capacidad de escribir valores de JsonObject directamente en una tabla de BigQuery. Esta clase proporciona acceso a los registros de BigQuery a través de una extensión de la clase OutputFormat de Hadoop. Para usarlo correctamente, se deben definir varios parámetros en la configuración de Hadoop y la clase OutputFormat debe ser IndirectBigQueryOutputFormat. A continuación, se muestra un ejemplo de los parámetros que se deben definir y las líneas de código necesarias para usar IndirectBigQueryOutputFormat correctamente.

Parámetros de salida

ID del proyecto
El ID del proyecto de BigQuery en el que se realizan todas las operaciones de salida.
Ejemplo: "my-first-cloud-project"
QualifiedOutputTableId
El conjunto de datos de BigQuery en el que se escribirán los resultados finales del trabajo, con el formato optional-projectId:datasetId.tableId. El valor de datasetId ya debe estar presente en tu proyecto. Se creará el conjunto de datos outputDatasetId_hadoop_temporary en BigQuery para los resultados temporales. Asegúrate de que no entre en conflicto con ningún otro conjunto de datos.
Ejemplos: test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output

outputTableFieldSchema
Un esquema que define el esquema de la tabla de BigQuery de salida
GcsOutputPath
Ruta de salida para almacenar datos temporales de Cloud Storage (gs://bucket/dir/)
    // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
);

Reductor

La clase IndirectBigQueryOutputFormat escribe en BigQuery. Toma una clave y un valor JsonObject como entrada y escribe solo el valor JsonObject en BigQuery (la clave se ignora). El JsonObject debe contener un registro de BigQuery en formato JSON. El Reducer debe generar un par clave-valor de cualquier tipo (NullWritable se usa en nuestro ejemplo de tarea WordCount) y JsonObject. A continuación se muestra el Reducer de la tarea de ejemplo WordCount.

  /**
   * Reducer function for WordCount.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

Limpieza

Una vez que se haya completado el trabajo, limpia las rutas de exportación de Cloud Storage.

job.waitForCompletion(true);
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

Puedes ver el recuento de palabras en la tabla de salida de BigQuery en la Google Cloud consola.

Código completo de un ejemplo de tarea WordCount

El código siguiente es un ejemplo de una tarea sencilla de recuento de palabras que agrega recuentos de palabras de objetos de BigQuery.

package com.google.cloud.hadoop.io.bigquery.samples;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * Sample program to run the Hadoop Wordcount example over tables in BigQuery.
 */
public class WordCount {

 // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  // Guava might not be available, so define a null / empty helper:
  private static boolean isStringNullOrEmpty(String toTest) {
    return toTest == null || "".equals(toTest);
  }

  /**
   * The mapper function for WordCount. For input, it consumes a LongWritable
   * and JsonObject as the key and value. These correspond to a row identifier
   * and Json representation of the row's values/columns.
   * For output, it produces Text and a LongWritable as the key and value.
   * These correspond to the word and a count for the number of times it has
   * occurred.
   */

  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for in
      // the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

  /**
   * Reducer function for WordCount. For input, it consumes the Text and
   * LongWritable that the mapper produced. For output, it produces a JsonObject
   * and NullWritable. The JsonObject represents the data that will be
   * loaded into BigQuery.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

  /**
   * Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
   * [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
   * [QualifiedOutputTableId] [GcsOutputPath]
   *
   * ProjectId - Project under which to issue the BigQuery
   * operations. Also serves as the default project for table IDs that don't
   * specify a project for the table.
   *
   * QualifiedInputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * InputTableFieldName - Name of the field to count in the
   * input table, e.g., 'word' in publicdata:samples.shakespeare or
   * 'repository_name' in publicdata:samples.github_timeline.
   *
   * QualifiedOutputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * GcsOutputPath - The output path to store temporary
   * Cloud Storage data, e.g., gs://bucket/dir/
   *
   * @param args a String[] containing ProjectId, QualifiedInputTableId,
   *     InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
   * @throws IOException on IO Error.
   * @throws InterruptedException on Interrupt.
   * @throws ClassNotFoundException if not all classes are present.
   */
  public static void main(String[] args)
      throws IOException, InterruptedException, ClassNotFoundException {

    // GenericOptionsParser is a utility to parse command line arguments
    // generic to the Hadoop framework. This example doesn't cover the specifics,
    // but recognizes several standard command line arguments, enabling
    // applications to easily specify a NameNode, a ResourceManager, additional
    // configuration resources, etc.
    GenericOptionsParser parser = new GenericOptionsParser(args);
    args = parser.getRemainingArgs();

    // Make sure we have the right parameters.
    if (args.length != 5) {
      System.out.println(
          "Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
              + "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
              + "    ProjectId - Project under which to issue the BigQuery operations. Also serves "
              + "as the default project for table IDs that don't explicitly specify a project for "
              + "the table.\n"
              + "    QualifiedInputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    InputTableFieldName - Name of the field to count in the input table, e.g., "
              + "'word' in publicdata:samples.shakespeare or 'repository_name' in "
              + "publicdata:samples.github_timeline.\n"
              + "    QualifiedOutputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
              + "gs://bucket/dir/");
      System.exit(1);
    }

    // Get the individual parameters from the command line.
    String projectId = args[0];
    String inputQualifiedTableId = args[1];
    String inputTableFieldId = args[2];
    String outputQualifiedTableId = args[3];
    String outputGcsPath = args[4];

   // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");

    conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);

    // This helps Hadoop identify the Jar which contains the mapper and reducer
    // by specifying a class in that Jar. This is required if the jar is being
    // passed on the command line to Hadoop.
    job.setJarByClass(WordCount.class);

    // Tell the job what data the mapper will output.
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setInputFormatClass(GsonBigQueryInputFormat.class);

    // Instead of using BigQueryOutputFormat, we use the newer
    // IndirectBigQueryOutputFormat, which works by first buffering all the data
    // into a Cloud Storage temporary file, and then on commitJob, copies all data from
    // Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
    // since it only requires one BigQuery "load" job per Hadoop/Spark job, as
    // compared to BigQueryOutputFormat, which performs one BigQuery job for each
    // Hadoop/Spark task.
    job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);

    job.waitForCompletion(true);

    // After the job completes, clean up the Cloud Storage export paths.
    GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

    // You can view word counts in the BigQuery output table at
    // https://console.cloud.google.com/.
  }
}

Versión de Java

El conector de BigQuery requiere Java 8.

Información de dependencia de Apache Maven

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>bigquery-connector</artifactId>
    <version>insert "hadoopX-X.X.X" connector version number here</version>
</dependency>

Para obtener información detallada, consulta las notas de la versión y la referencia de Javadoc del conector de BigQuery.