This document describes how to read data from BigQuery to Dataflow.
Overview
The BigQuery I/O connector connector supports two options for reading from BigQuery:
- Direct table reads. This option is the fastest, because it uses the BigQuery Storage Read API.
- Export job. With this option, BigQuery runs an export job that writes the table data to Cloud Storage. The connector then reads the exported data from Cloud Storage. This option is less efficient, because it requires the export step.
Export jobs are the default option. To specify direct reads, call
withMethod(Method.DIRECT_READ)
.
The connector serializes the table data into a PCollection
. Each element in
the PCollection
represents a single table row. The connector supports
the following serialization methods:
- Read the data as Avro-formatted records. Using this method, you provide a function that parses the Avro records into a custom data type.
- Read the data as
TableRow
objects. This method is convenient because it doesn't require a custom data type. However, it generally has lower performance than reading Avro-formatted records.
Parallelism
Parallelism in this connector depends on the read method:
Direct reads: The I/O connector produces a dynamic number of streams, based on the size of the export request. It reads these streams directly from BigQuery in parallel.
Export jobs: BigQuery determines how many files to write to Cloud Storage. The number of files depends on the query and the volume of data. The I/O connector reads the exported files in parallel.
Performance
The following table shows performance metrics for various
BigQuery I/O read options. The workloads were run on one
e2-standard2
worker, using the Apache Beam SDK 2.49.0 for Java. They did
not use Runner v2.
100 M records | 1 kB | 1 column | Throughput (bytes) | Throughput (elements) |
---|---|---|
Storage Read | 120 MBps | 88,000 elements per second |
Avro Export | 105 MBps | 78,000 elements per second |
Json Export | 110 MBps | 81,000 elements per second |
These metrics are based on simple batch pipelines. They are intended to compare performance between I/O connectors, and are not necessarily representative of real-world pipelines. Dataflow pipeline performance is complex, and is a function of VM type, the data being processed, the performance of external sources and sinks, and user code. Metrics are based on running the Java SDK, and aren't representative of the performance characteristics of other language SDKs. For more information, see Beam IO Performance.
Best practices
In general, we recommend using direct table reads (
Method.DIRECT_READ
). The Storage Read API is better suited to data pipelines than export jobs, because it doesn't need the intermediate step of exporting data.If you use direct reads, you are charged for Storage Read API usage. See Data extraction pricing in the BigQuery pricing page.
There is no additional cost for export jobs. However, export jobs have limits. For large data movement, where timeliness is a priority and cost is adjustable, direct reads are recommended.
The Storage Read API has quota limits. Use Google Cloud metrics to monitor your quota usage.
When using the Storage Read API, you might see lease expiration and session timeout errors in the logs, such as:
DEADLINE_EXCEEDED
Server Unresponsive
StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session
`
These errors can occur when an operation takes longer than the timeout, usually in pipelines that run for longer than 6 hours. To mitigate this issue, switch to file exports.
Examples
The code examples in this section use direct table reads.
To use an export job instead, omit the call to withMethod
or specify
Method.EXPORT
. Then set the --tempLocation
pipeline option to
specify a Cloud Storage bucket for the exported files.
These code examples assume the source table has the following columns:
name
(string)age
(integer)
Specified as a JSON schema file:
[
{"name":"user_name","type":"STRING","mode":"REQUIRED"},
{"name":"age","type":"INTEGER","mode":"REQUIRED"}
]
Read Avro-formatted records
To read BigQuery data into Avro-formatted records, use the
read(SerializableFunction)
method. This method
takes an application-defined function that parses
SchemaAndRecord
objects and returns a
custom data type. The output from the connector is a PCollection
of your
custom data type.
The following code reads a PCollection<MyData>
from a BigQuery
table, where MyData
is an application-defined class.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
The read
method takes a SerializableFunction<SchemaAndRecord, T>
interface,
which defines a function to convert from Avro records to a custom data class. In
the previous code example, the MyData.apply
method implements this conversion
function. The example function parses the name
and age
fields from the Avro
record and returns a MyData
instance.
To specify which BigQuery table to read, call the from
method,
as shown in the previous example. For more information, see
Table names
in the BigQuery I/O connector documentation.
Read TableRow
objects
The readTableRows
method reads
BigQuery data into a PCollection
of
TableRow
objects. Each TableRow
is a
map of key-value pairs that holds a single row of table data. Specify the
BigQuery table to read by calling the from
method.
The following code reads a PCollection<TableRows>
from a
BigQuery table.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
This example also shows how to access the values from the TableRow
dictionary.
Integer values are encoded as strings to match BigQuery's
exported JSON format.
Column projection and filtering
When using direct reads (Method.DIRECT_READ
), you can make the read operations
more efficient by reducing how much data is read from BigQuery
and sent over the network.
- Column projection: Call
withSelectedFields
to read a subset of columns from the table. This allows efficient reads when tables contain many columns. - Row filtering: Call
withRowRestriction
to specify a predicate that filters data on the server side.
Filter predicates must be deterministic, and aggregation is not supported.
The following example projects the "user_name"
and "age"
columns, and
filters out rows that don't match the predicate "age > 18"
.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.
Read from a query result
The previous examples show how to read rows from a table. You can also read
from the result of a SQL query, by calling fromQuery
. This approach moves
some of the computational work into BigQuery. You can also use
this method to read from a BigQuery view or materialized view, by
running a query against the view.
The following example runs a query against a BigQuery public dataset and reads the results. After the pipeline runs, you can see the query job in your BigQuery job history.
Java
To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.