Lee de BigQuery a Dataflow

En este documento, se describe cómo leer datos de BigQuery a Dataflow mediante el conector de E/S de BigQuery de Apache Beam.

Descripción general

El conector de E/S de BigQuery admite dos opciones para leer desde BigQuery:

  • Lecturas directas de tablas. Esta opción es la más rápida, ya que usa la API de lectura de almacenamiento de BigQuery.
  • Exportar trabajo. Con esta opción, BigQuery ejecuta un trabajo de exportación que escribe los datos de la tabla en Cloud Storage. Luego, el conector lee los datos exportados desde Cloud Storage. Esta opción es menos eficiente, ya que requiere el paso de exportación.

Los trabajos de exportación son la opción predeterminada. Para especificar lecturas directas, llama a withMethod(Method.DIRECT_READ).

El conector serializa los datos de la tabla en un PCollection. Cada elemento en PCollection representa una sola fila de la tabla. El conector admite los siguientes métodos de serialización:

Paralelismo

El paralelismo en este conector depende del método de lectura:

  • Lecturas directas: el conector de E/S produce una cantidad dinámica de transmisiones, según el tamaño de la solicitud de exportación. Lee estas transmisiones directamente desde BigQuery en paralelo.

  • Trabajos de exportación: BigQuery determina cuántos archivos se escriben en Cloud Storage. La cantidad de archivos depende de la consulta y del volumen de datos. El conector de E/S lee los archivos exportados en paralelo.

Rendimiento

En la siguiente tabla, se muestran las métricas de rendimiento de varias opciones de lectura de E/S de BigQuery. Las cargas de trabajo se ejecutaron en un trabajador e2-standard2, con el SDK de Apache Beam 2.49.0 para Java. No usaron Runner v2.

100 millones de registros | 1 KB | 1 columna Capacidad de procesamiento (bytes) Capacidad de procesamiento (elementos)
Lectura de almacenamiento 120 MBps 88,000 elementos por segundo
Exportación de Avro 105 MBps 78,000 elementos por segundo
Exportación de Json 110 MBps 81,000 elementos por segundo

Estas métricas se basan en canalizaciones por lotes simples. Están diseñadas para comparar el rendimiento entre los conectores de E/S y no representan necesariamente las canalizaciones del mundo real. El rendimiento de la canalización de Dataflow es complejo y es una función del tipo de VM, los datos que se procesan, el rendimiento de las fuentes y los receptores externos y el código de usuario. Las métricas se basan en la ejecución del SDK de Java y no representan las características de rendimiento de otros SDK de lenguaje. Para obtener más información, consulta Rendimiento de E/S de Beam.

Prácticas recomendadas

  • En general, recomendamos usar lecturas de tablas directas (Method.DIRECT_READ). La API de Storage Read es más adecuada para canalizaciones de datos que para trabajos de exportación, ya que no necesita el paso intermedio de exportar datos.

  • Si usas lecturas directas, se te cobra por el uso de la API de Storage Read. Consulta Precios de extracción de datos en la página de precios de BigQuery.

  • Los trabajos de exportación no tienen costo adicional. Sin embargo, los trabajos de exportación tienen límites. Para movimientos de datos grandes, en los que la puntualidad es una prioridad y el costo es ajustable, se recomiendan las lecturas directas.

  • La API de Storage Read tiene límites de cuota. Usa las métricas deGoogle Cloud para supervisar el uso de tu cuota.

  • Cuando usas la API de lectura de almacenamiento, es posible que veas errores de vencimiento y tiempo de espera de sesión en los registros, como los siguientes:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session

    Estos errores pueden ocurrir cuando una operación tarda más que el tiempo de espera, por lo general, en canalización que se ejecutan durante más de 6 horas. Para mitigar este problema, cambia a las exportaciones de archivos.

Ejemplos

Los ejemplos de código de esta sección usan lecturas de tablas directas.

Para usar un trabajo de exportación, omite la llamada a withMethod o especifica Method.EXPORT. Luego, configura la opción de canalización --tempLocation a fin de especificar un depósito de Cloud Storage para los archivos exportados.

En estos ejemplos de código, se supone que la tabla de origen tiene las siguientes columnas:

  • name (string)
  • age (entero)

Especificado como un archivo de esquema JSON:

[
  {"name":"user_name","type":"STRING","mode":"REQUIRED"},
  {"name":"age","type":"INTEGER","mode":"REQUIRED"}
]

Lee registros con formato Avro

Para leer datos de BigQuery en registros con formato Avro, usa el método read(SerializableFunction). Este método toma una función definida por la aplicación que analiza objetos SchemaAndRecord y muestra un tipo de datos personalizado. El resultado del conector es un PCollection de tu tipo de datos personalizado.

El siguiente código lee un PCollection<MyData> de una tabla de BigQuery, en la que MyData es una clase definida por la aplicación.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

El método read toma una interfaz SerializableFunction<SchemaAndRecord, T>, que define una función para convertir de los registros de Avro en una clase de datos personalizada. En el ejemplo de código anterior, el método MyData.apply implementa esta función de conversión. La función de ejemplo analiza los campos name y age del registro de Avro y muestra una instancia de MyData.

Para especificar qué tabla de BigQuery leer, llama al método from, como se muestra en el ejemplo anterior. Para obtener más información, consulta Nombres de tablas en la documentación del conector de E/S de BigQuery.

Lectura de objetos TableRow

El método readTableRows lee datos de BigQuery en un PCollection de objetos TableRow. Cada TableRow es un mapa de pares clave-valor que contiene una sola fila de datos de tabla. Para especificar la tabla de BigQuery que se leerá, llama al método from.

El siguiente código lee un PCollection<TableRows> de una tabla de BigQuery.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

En este ejemplo, también se muestra cómo acceder a los valores del diccionario TableRow. Los valores enteros se codifican como cadenas para coincidir con el formato de JSON exportado de BigQuery.

Proyección y filtrado de columnas

Cuando usas lecturas directas (Method.DIRECT_READ), puedes hacer que las operaciones de lectura sean más eficientes, ya que se reduce la cantidad de datos que se leen de BigQuery y se envían a través de la red.

  • Proyección de columnas: Llama a withSelectedFields para leer un subconjunto de columnas de la tabla. Esto permite realizar lecturas eficientes cuando las tablas contienen muchas columnas.
  • Filtrado de filas: Llama a withRowRestriction para especificar un predicado que filtre los datos en el servidor.

Los predicados de filtro deben ser determinísticos y no se admite la agregación.

En el siguiente ejemplo, se proyectan las columnas "user_name" y "age", y se filtran las filas que no coinciden con el predicado "age > 18".

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(BigQueryIO.readTableRows()
            // Read rows from a specified table.
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ)
            .withSelectedFields(Arrays.asList("user_name", "age"))
            .withRowRestriction("age > 18")
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Lee desde el resultado de una consulta

En los ejemplos anteriores, se muestra cómo leer filas de una tabla. También puedes leer del resultado de una consulta en SQL llamando a fromQuery. Con este enfoque, se traslada parte del trabajo de procesamiento a BigQuery. También puedes usar este método para leer desde una vista de BigQuery o una vista materializada. Para ello, ejecuta una consulta en la vista.

En el siguiente ejemplo, se ejecuta una consulta en un conjunto de datos públicos de BigQuery y se leen los resultados. Una vez que se ejecuta la canalización, puedes ver el trabajo de consulta en tu historial de trabajos de BigQuery.

Java

Para autenticarte en Dataflow, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read the query results into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .fromQuery(queryString)
            .usingStandardSql()
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((TableRow row) -> {
              System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

¿Qué sigue?