Read from BigQuery to Dataflow

This document describes how to read data from BigQuery to Dataflow.

Overview

The BigQuery I/O connector connector supports two options for reading from BigQuery:

  • Direct table reads. This option is the fastest, because it uses the BigQuery Storage Read API.
  • Export job. With this option, BigQuery runs an export job that writes the table data to Cloud Storage. The connector then reads the exported data from Cloud Storage. This option is less efficient, because it requires the export step.

Export jobs are the default option. To specify direct reads, call withMethod(Method.DIRECT_READ).

The connector serializes the table data into a PCollection. Each element in the PCollection represents a single table row. The connector supports the following serialization methods:

Parallelism

Parallelism in this connector depends on the read method:

  • Direct reads: The I/O connector produces a dynamic number of streams, based on the size of the export request. It reads these streams directly from BigQuery in parallel.

  • Export jobs: BigQuery determines how many files to write to Cloud Storage. The number of files depends on the query and the volume of data. The I/O connector reads the exported files in parallel.

Performance

The following table shows performance metrics for various BigQuery I/O read options. The workloads were run on one e2-standard2 worker, using the Apache Beam SDK 2.49.0 for Java. They did not use Runner v2.

100 M records | 1 kB | 1 column Throughput (bytes) Throughput (elements)
Storage Read 120 MBps 88,000 elements per second
Avro Export 105 MBps 78,000 elements per second
Json Export 110 MBps 81,000 elements per second

These metrics are based on simple batch pipelines. They are intended to compare performance between I/O connectors, and are not necessarily representative of real-world pipelines. Dataflow pipeline performance is complex, and is a function of VM type, the data being processed, the performance of external sources and sinks, and user code. Metrics are based on running the Java SDK, and aren't representative of the performance characteristics of other language SDKs. For more information, see Beam IO Performance.

Best practices

  • In general, we recommend using direct table reads (Method.DIRECT_READ). The Storage Read API is better suited to data pipelines than export jobs, because it doesn't need the intermediate step of exporting data.

  • If you use direct reads, you are charged for Storage Read API usage. See Data extraction pricing in the BigQuery pricing page.

  • There is no additional cost for export jobs. However, export jobs have limits. For large data movement, where timeliness is a priority and cost is adjustable, direct reads are recommended.

  • The Storage Read API has quota limits. Use Google Cloud metrics to monitor your quota usage.

  • When using the Storage Read API, you might see lease expiration and session timeout errors in the logs, such as:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    These errors can occur when an operation takes longer than the timeout, usually in pipelines that run for longer than 6 hours. To mitigate this issue, switch to file exports.

Examples

The code examples in this section use direct table reads.

To use an export job instead, omit the call to withMethod or specify Method.EXPORT. Then set the --tempLocation pipeline option to specify a Cloud Storage bucket for the exported files.

These code examples assume the source table has the following columns:

  • name (string)
  • age (integer)

Specified as a JSON schema file:

[
  {"name":"user_name","type":"STRING","mode":"REQUIRED"},
  {"name":"age","type":"INTEGER","mode":"REQUIRED"}
]

Read Avro-formatted records

To read BigQuery data into Avro-formatted records, use the read(SerializableFunction) method. This method takes an application-defined function that parses SchemaAndRecord objects and returns a custom data type. The output from the connector is a PCollection of your custom data type.

The following code reads a PCollection<MyData> from a BigQuery table, where MyData is an application-defined class.

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

The read method takes a SerializableFunction<SchemaAndRecord, T> interface, which defines a function to convert from Avro records to a custom data class. In the previous code example, the MyData.apply method implements this conversion function. The example function parses the name and age fields from the Avro record and returns a MyData instance.

To specify which BigQuery table to read, call the from method, as shown in the previous example. For more information, see Table names in the BigQuery I/O connector documentation.

Read TableRow objects

The readTableRows method reads BigQuery data into a PCollection of TableRow objects. Each TableRow is a map of key-value pairs that holds a single row of table data. Specify the BigQuery table to read by calling the from method.

The following code reads a PCollection<TableRows> from a BigQuery table.

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

This example also shows how to access the values from the TableRow dictionary. Integer values are encoded as strings to match BigQuery's exported JSON format.

Column projection and filtering

When using direct reads (Method.DIRECT_READ), you can make the read operations more efficient by reducing how much data is read from BigQuery and sent over the network.

  • Column projection: Call withSelectedFields to read a subset of columns from the table. This allows efficient reads when tables contain many columns.
  • Row filtering: Call withRowRestriction to specify a predicate that filters data on the server side.

Filter predicates must be deterministic, and aggregation is not supported.

The following example projects the "user_name" and "age" columns, and filters out rows that don't match the predicate "age > 18".

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(BigQueryIO.readTableRows()
            // Read rows from a specified table.
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ)
            .withSelectedFields(Arrays.asList("user_name", "age"))
            .withRowRestriction("age > 18")
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Read from a query result

The previous examples show how to read rows from a table. You can also read from the result of a SQL query, by calling fromQuery. This approach moves some of the computational work into BigQuery. You can also use this method to read from a BigQuery view or materialized view, by running a query against the view.

The following example runs a query against a BigQuery public dataset and reads the results. After the pipeline runs, you can see the query job in your BigQuery job history.

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read the query results into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .fromQuery(queryString)
            .usingStandardSql()
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((TableRow row) -> {
              System.out.printf("Repo: %s, commits: %s%n", row.get("repo"), row.get("count"));
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

What's next