Performing ETL from a relational database into BigQuery using Dataflow

Last reviewed 2022-08-21 UTC

This tutorial demonstrates how to use Dataflow to extract, transform, and load (ETL) data from an online transaction processing (OLTP) relational database into BigQuery for analysis.

This tutorial is intended for database admins, operations professionals, and cloud architects interested in taking advantage of the analytical query capabilities of BigQuery and the batch processing capabilities of Dataflow.

OLTP databases are often relational databases that store information and process transactions for ecommerce sites, software as a service (SaaS) applications, or games. OLTP databases are usually optimized for transactions, which require the ACID properties: atomicity, consistency, isolation, and durability, and typically have highly normalized schemas. In contrast, data warehouses tend to be optimized for data retrieval and analysis, rather than transactions, and typically feature denormalized schemas. Generally, denormalizing data from an OLTP database makes it more useful for analysis in BigQuery.

Objectives

The tutorial shows two approaches to ETL normalized RDBMS data into denormalized BigQuery data:

  • Using BigQuery to load and transform the data. Use this approach to perform a one-time load of a small amount of data into BigQuery for analysis. You might also use this approach to prototype your dataset before you automate larger or multiple datasets.
  • Using Dataflow to load, transform, and cleanse the data. Use this approach to load a larger amount of data, load data from multiple data sources, or to load data incrementally or automatically.

Costs

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.

Before you begin

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine and Dataflow APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine and Dataflow APIs.

    Enable the APIs

Using the MusicBrainz dataset

This tutorial relies on JSON snapshots of tables in the MusicBrainz database, which is built on PostgreSQL and contains information about all of the MusicBrainz music. Some elements of the MusicBrainz schema include:

  • Artists
  • Release groups
  • Releases
  • Recordings
  • Works
  • Labels
  • Many of the relationships between these entities.

The MusicBrainz schema includes three relevant tables: artist, recording, and artist_credit_name. An artist_credit represents credit given to the artist for a recording, and the artist_credit_name rows link the recording with its corresponding artist through the artist_credit value.

This tutorial provides the PostgreSQL tables already extracted into newline-delimited JSON format and stored in a public Cloud Storage bucket: gs://solutions-public-assets/bqetl

If you want to perform this step yourself, you need to have a PostgreSQL database containing the MusicBrainz dataset, and use the following commands to export each of the tables:

host=POSTGRES_HOST
user=POSTGRES_USER
database=POSTGRES_DATABASE

for table in artist recording artist_credit_name
do
    pg_cmd="\\copy (select row_to_json(r) from (select * from ${table}) r ) to exported_${table}.json"
    psql -w -h ${host} -U ${user} -d ${db} -c $pg_cmd
    # clean up extra '\' characters
    sed -i -e 's/\\\\/\\/g' exported_${table}.json 
done

Approach 1: ETL with BigQuery

Use this approach to perform a one-time load of a small amount of data into BigQuery for analysis. You might also use this approach to prototype your dataset before you use automation with larger or multiple datasets.

Create a BigQuery dataset

To create a BigQuery dataset, you load the MusicBrainz tables into BigQuery individually, and then you join the tables that you loaded so that each row contains the data linkage that you want. You store the join results in a new BigQuery table. Then you can delete the original tables that you loaded.

  1. In the Google Cloud console, open BigQuery.

    OPEN BIGQUERY

  2. In the Explorer panel, click the menu next to your project name, and then click Create data set.

  3. In the Create data set dialog, complete the following steps:

    1. In the Data set ID field, enter musicbrainz.
    2. Set the Data Location to us.
    3. Click Create data set.

Import MusicBrainz tables

For each MusicBrainz table, perform the following steps to add a table to the dataset you created:

  1. In the Google Cloud console BigQuery Explorer panel, expand the row with your project name to show the newly created musicbrainz dataset.
  2. Click the menu next to your musicbrainz dataset, and then click Create Table.
  3. In the Create Table dialog, complete the following steps:

    1. In the Create table from drop-down list, select Google Cloud Storage.
    2. In the Select file from GCS bucket field, enter the path to the data file:

      solutions-public-assets/bqetl/artist.json
      
    3. For File format, select JSONL (Newline Delimited JSON).

    4. Ensure that Project contains your project name.

    5. Ensure that Data set is musicbrainz.

    6. For Table, enter the table name, artist.

    7. For Table type, leave Native table selected.

    8. Below the Schema section, click to turn on Edit as Text.

    9. Download the artist schema file and open it in a text editor or viewer.

    10. Replace the contents of the Schema section with the contents of the schema file you downloaded.

    11. Click Create Table:

  4. Wait a few moments for the load job to complete.

  5. When the load has finished, the new table appears under the dataset.

  6. Repeat steps 1 - 5 to create the artist_credit_name table with the following changes:

    • Use the following path for the source data file:

      solutions-public-assets/bqetl/artist_credit_name.json
      
    • Use artist_credit_name as the Table name.

    • Download the artist_credit_name schema file and use the contents for the schema.

  7. Repeat steps 1 - 5 to create the recording table with the following changes:

    • Use the following path for the source data file:

      solutions-public-assets/bqetl/recording.json
      
    • Use recording as the Table name.

    • Download the recording schema file. and use the contents for the schema.

Manually denormalize the data

To denormalize the data, join the data into a new BigQuery table that has one row for each artist's recording, together with selected metadata you want retained for analysis.

  1. If the BigQuery query editor is not open in the Google Cloud console click Compose New Query.
  2. Copy the following query and paste it into the Query Editor:

    SELECT
        artist.id,
        artist.gid AS artist_gid,
        artist.name AS artist_name,
        artist.area,
        recording.name AS recording_name,
        recording.length,
        recording.gid AS recording_gid,
        recording.video
    FROM
        `musicbrainz.artist` AS artist
    INNER JOIN
        `musicbrainz.artist_credit_name` AS artist_credit_name
    ON
        artist.id = artist_credit_name.artist
    INNER JOIN
        `musicbrainz.recording` AS recording
    ON
        artist_credit_name.artist_credit = recording.artist_credit
    
  3. Click the More drop-down list, and then select Query settings.

  4. In the Query settings dialog, complete the following steps:

    1. Select Set a destination table for query results.
    2. In Dataset, enter musicbrainz and select the dataset in your project.
    3. In Table id enter recordings_by_artists_manual.
    4. For Destination table write preference, click Overwrite table.
    5. Select the Allow Large Results (no size limit) checkbox.
    6. Click Save.
  5. Click Run.

    When the query is complete, the data from the query result is organized into songs for each artist in the newly created BigQuery table, and a sample of the results shown in the Query Results pane, for example:

    Row id artist_gid artist_name area recording_name length recording_gid video
    1 97546 125ec42a... unknown 240 Horo Gun Toireamaid Hùgan Fhathast Air 174106 c8bbe048... FALSE
    2 266317 2e7119b5... Capella Istropolitana 189 Concerto Grosso in D minor, op. 2 no. 3: II. Adagio 134000 af0f294d... FALSE
    3 628060 34cd3689... Conspirare 5196 Liturgy, op. 42: 9. Praise the Lord from the Heavens 126933 8bab920d... FALSE
    4 423877 54401795... Boys Air Choir 1178 Nunc Dimittis 190000 111611eb... FALSE
    5 394456 9914f9f9... L’Orchestre de la Suisse Romande 23036 Concert Waltz no. 2, op. 51 509960 b16742d1... FALSE

Approach 2: ETL into BigQuery with Dataflow

In this section of the tutorial, instead of using the BigQuery UI, you use a sample program to load data into BigQuery by using a Dataflow pipeline. Then, you use the Beam programming model to denormalize and cleanse data to load into BigQuery.

Before you begin, review the concepts and the sample code.

Review the concepts

Although the data is small and can quickly be uploaded by using the BigQuery UI, for the purpose of this tutorial you can also use Dataflow for ETL. Use Dataflow for ETL into BigQuery instead of the BigQuery UI when you are performing massive joins, that is, from around 500-5000 columns of more than 10 TB of data, with the following goals:

  • You want to clean or transform your data as it's loaded into BigQuery, instead of storing it and joining afterwards. As a result, this approach also has lower storage requirements because data is only stored in BigQuery in its joined and transformed state.
  • You plan to do custom data cleansing (which cannot be simply achieved with SQL).
  • You plan to combine the data with data outside of the OLTP, such as logs or remotely accessed data, during the loading process.
  • You plan to automate testing and deployment of data-loading logic using continuous integration or continuous deployment (CI/CD).
  • You anticipate gradual iteration, enhancement, and improvement of the ETL process over time.
  • You plan to add data incrementally, as opposed to performing a one-time ETL.

Here's a diagram of the data pipeline that's created by the sample program:

Data pipeline using BigQuery.

In the example code, many of the pipeline steps are grouped or wrapped in convenience methods, given descriptive names, and reused. In the diagram, reused steps are indicated by dashed borders.

Review the pipeline code

The code creates a pipeline that performs the following steps:

  1. Loads each table that you want to be part of the join from the public Cloud Storage bucket into a PCollection of strings. Each element comprises the JSON representation of a row of the table.

    public static PCollection<String> loadText(Pipeline p, String name) {
      BQETLOptions options = (BQETLOptions) p.getOptions();
      String loadingBucket = options.getLoadingBucketURL();
      String objectToLoad = storedObjectName(loadingBucket, name);
      return p.apply(name, TextIO.read().from(objectToLoad));
    }
  2. Converts those JSON strings to object representations, MusicBrainzDataObject objects, and then organize the object representations by one of the column values, such as a primary or foreign key.

    public static PCollection<KV<Long, MusicBrainzDataObject>> loadTableFromText(
        PCollection<String> text, String name, String keyName) {
      final String namespacedKeyname = name + "_" + keyName;
      return text.apply(
          "load " + name,
          MapElements.into(new TypeDescriptor<KV<Long, MusicBrainzDataObject>>() {})
              .via(
                  (String input) -> {
                    MusicBrainzDataObject datum = JSONReader.readObject(name, input);
                    Long key = (Long) datum.getColumnValue(namespacedKeyname);
                    return KV.of(key, datum);
                  }));
    }
  3. Joins the list based on common artist. The artist_credit_name links an artist credit with its recording and includes the artist foreign key. The artist_credit_name table is loaded as a list of key value KV objects. The K member is the artist.

    PCollection<MusicBrainzDataObject> artistCredits =
        MusicBrainzTransforms.innerJoin("artists with artist credits", artists, artistCreditName);
  4. Joins the list by using the MusicBrainzTransforms.innerJoin() method.

    public static PCollection<MusicBrainzDataObject> innerJoin(
        String name,
        PCollection<KV<Long, MusicBrainzDataObject>> table1,
        PCollection<KV<Long, MusicBrainzDataObject>> table2) {
      final TupleTag<MusicBrainzDataObject> t1 = new TupleTag<MusicBrainzDataObject>() {};
      final TupleTag<MusicBrainzDataObject> t2 = new TupleTag<MusicBrainzDataObject>() {};
      PCollection<KV<Long, CoGbkResult>> joinedResult = group(name, table1, table2, t1, t2);
    1. Groups the collections of KV objects by the key member on which you want to join. This results in a PCollection of KV objects with a long key (the artist.id column value) and resulting CoGbkResult (which stands for combine group by key result). The CoGbkResult object is a tuple of lists of objects with the key value in common from the first and second PCollections. This tuple is addressable by using the tuple tag formulated for each PCollection prior to running the CoGroupByKey operation in the group method.
    2. Merges each matchup of objects into a MusicBrainzDataObject object that represents a join result.

      PCollection<List<MusicBrainzDataObject>> mergedResult =
          joinedResult.apply(
              "merge join results",
              MapElements.into(new TypeDescriptor<List<MusicBrainzDataObject>>() {})
                  .via(
                      (KV<Long, CoGbkResult> group) -> {
                        List<MusicBrainzDataObject> result = new ArrayList<>();
                        Iterable<MusicBrainzDataObject> leftObjects = group.getValue().getAll(t1);
                        Iterable<MusicBrainzDataObject> ri