Membaca dari BigQuery ke Dataflow

Dokumen ini menjelaskan cara membaca data dari BigQuery ke Dataflow.

Ringkasan

Untuk sebagian besar kasus penggunaan, pertimbangkan untuk menggunakan Managed I/O untuk membaca dari BigQuery. I/O Terkelola menyediakan fitur seperti upgrade otomatis dan API konfigurasi yang konsisten. Saat membaca dari BigQuery, I/O Terkelola melakukan pembacaan tabel langsung yang menawarkan performa pembacaan terbaik.

Jika Anda memerlukan penyesuaian performa yang lebih canggih, pertimbangkan untuk menggunakan konektor BigQueryIO. Konektor BigQueryIO mendukung pembacaan tabel langsung dan pembacaan dari tugas ekspor BigQuery. Fitur ini juga menawarkan kontrol yang lebih terperinci atas deserialisasi rekaman tabel. Untuk mengetahui informasi selengkapnya, lihat Menggunakan konektor BigQueryIO dalam dokumen ini.

Proyeksi dan pemfilteran kolom

Untuk mengurangi volume data yang dibaca pipeline dari BigQuery, Anda dapat menggunakan teknik berikut:

  • Proyeksi kolom menentukan subset kolom yang akan dibaca dari tabel. Gunakan proyeksi kolom saat tabel Anda memiliki banyak kolom dan Anda hanya perlu membaca sebagian kecil kolom tersebut.
  • Pemfilteran baris menentukan predikat yang akan diterapkan ke tabel. Operasi baca BigQuery hanya menampilkan baris yang cocok dengan filter, yang dapat mengurangi total jumlah data yang di-ingest oleh pipeline.

Contoh berikut membaca kolom "user_name" dan "age" dari tabel dan memfilter baris yang tidak cocok dengan predikat "age > 18". Contoh ini menggunakan I/O Terkelola.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    String tableSpec = String.format("%s:%s.%s",
        options.getProjectId(),
        options.getDatasetName(),
        options.getTableName());

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", tableSpec)
        .put("row_restriction", "age > 18")
        .put("fields", List.of("user_name", "age"))
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Name: %s, Age: %s%n",
                  row.getString("user_name"),
                  row.getInt64("age"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Membaca dari hasil kueri

Contoh berikut menggunakan I/O Terkelola untuk membaca hasil kueri SQL. Aplikasi ini menjalankan kueri terhadap set data publik BigQuery. Anda juga dapat menggunakan kueri SQL untuk membaca dari tampilan BigQuery atau tampilan terwujud.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("query", queryString)
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Repo: %s, commits: %d%n",
                  row.getString("repo"),
                  row.getInt64("count"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Menggunakan konektor BigQueryIO

Konektor BigQueryIO mendukung metode serialisasi berikut:

Konektor mendukung dua opsi untuk membaca data:

  • Mengekspor tugas. Secara default, konektor BigQueryIO menjalankan tugas ekspor BigQuery yang menulis data tabel ke Cloud Storage. Kemudian, konektor membaca data dari Cloud Storage.
  • Pembacaan tabel langsung. Opsi ini lebih cepat daripada tugas ekspor, karena menggunakan BigQuery Storage Read API dan melewati langkah ekspor. Untuk menggunakan pembacaan tabel langsung, panggil withMethod(Method.DIRECT_READ) saat Anda membangun pipeline.

Saat memilih opsi yang akan digunakan, pertimbangkan poin-poin berikut:

  • Secara umum, sebaiknya gunakan pembacaan tabel langsung. Storage Read API lebih cocok untuk pipeline data daripada tugas ekspor, karena tidak memerlukan langkah perantara untuk mengekspor data.

  • Jika menggunakan pembacaan langsung, Anda akan ditagih untuk penggunaan Storage Read API. Lihat Harga ekstraksi data di halaman harga BigQuery.

  • Tidak ada biaya tambahan untuk tugas ekspor. Namun, tugas ekspor memiliki batas. Untuk pemindahan data dalam jumlah besar, yang mengutamakan ketepatan waktu dan biaya dapat disesuaikan, sebaiknya gunakan pembacaan langsung.

  • Storage Read API memiliki batas kuota. Gunakan metrikGoogle Cloud untuk memantau penggunaan kuota Anda.

  • Jika Anda menggunakan tugas ekspor, tetapkan --tempLocation opsi pipeline ke untuk menentukan bucket Cloud Storage bagi file yang diekspor.

  • Saat menggunakan Storage Read API, Anda mungkin melihat error waktu tunggu sesi dan masa berlaku sewa di log, seperti:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    Error ini dapat terjadi saat operasi memerlukan waktu lebih lama daripada waktu tunggu, biasanya di pipeline yang berjalan lebih dari 6 jam. Untuk mengurangi masalah ini, beralihlah ke ekspor file.

  • Tingkat paralelisme bergantung pada metode baca:

    • Pembacaan langsung: Konektor I/O menghasilkan sejumlah aliran data dinamis, berdasarkan ukuran permintaan ekspor. Pipeline ini membaca aliran tersebut secara paralel langsung dari BigQuery.

    • Tugas ekspor: BigQuery menentukan jumlah file yang akan ditulis ke Cloud Storage. Jumlah file bergantung pada kueri dan volume data. Konektor I/O membaca file yang diekspor secara paralel.

Tabel berikut menunjukkan metrik performa untuk berbagai opsi baca I/O BigQuery. Beban kerja dijalankan di satu pekerja e2-standard2, menggunakan Apache Beam SDK 2.49.0 untuk Java. Mereka tidak menggunakan Runner v2.

100 Juta data | 1 kB | 1 kolom Throughput (byte) Throughput (elemen)
Storage Read 120 MBps 88.000 elemen per detik
Ekspor Avro 105 MBps 78.000 elemen per detik
Json Export 110 MBps 81.000 elemen per detik

Metrik ini didasarkan pada pipeline batch sederhana. Benchmark ini dimaksudkan untuk membandingkan performa antara konektor I/O, dan tidak selalu mewakili pipeline dunia nyata. Performa pipeline Dataflow bersifat kompleks, dan merupakan fungsi dari jenis VM, data yang diproses, performa sumber dan sink eksternal, serta kode pengguna. Metrik didasarkan pada menjalankan Java SDK, dan tidak mewakili karakteristik performa SDK bahasa lainnya. Untuk mengetahui informasi selengkapnya, lihat Performa IO Beam.

Contoh

Contoh kode berikut menggunakan konektor BigQueryIO dengan pembacaan tabel langsung. Untuk menggunakan tugas ekspor sebagai gantinya, hapus panggilan ke withMethod.

Membaca data berformat Avro

Contoh ini menunjukkan cara menggunakan konektor BigQueryIO untuk membaca rekaman yang diformat Avro.

Untuk membaca data BigQuery ke dalam rekaman berformat Avro, gunakan metode read(SerializableFunction). Metode ini menggunakan fungsi yang ditentukan aplikasi yang mengurai objek SchemaAndRecord dan menampilkan jenis data kustom. Output dari konektor adalah PCollection dari jenis data kustom Anda.

Kode berikut membaca PCollection<MyData> dari tabel BigQuery, dengan MyData adalah class yang ditentukan aplikasi.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Metode read menggunakan antarmuka SerializableFunction<SchemaAndRecord, T>, yang menentukan fungsi untuk mengonversi dari rekaman Avro ke class data kustom. Dalam contoh kode sebelumnya, metode MyData.apply menerapkan fungsi konversi ini. Fungsi contoh mengurai kolom name dan age dari rekaman Avro dan menampilkan instance MyData.

Untuk menentukan tabel BigQuery yang akan dibaca, panggil metode from, seperti yang ditunjukkan dalam contoh sebelumnya. Untuk mengetahui informasi selengkapnya, lihat Nama tabel dalam dokumentasi konektor I/O BigQuery.

Membaca objek TableRow

Contoh ini menunjukkan cara menggunakan konektor BigQueryIO untuk membaca objek TableRow.

Metode readTableRows membaca data BigQuery ke dalam PCollection dari objek TableRow. Setiap TableRow adalah peta key-value pair yang menyimpan satu baris data tabel. Tentukan tabel BigQuery yang akan dibaca dengan memanggil metode from.

Kode berikut membaca PCollection<TableRows> dari tabel BigQuery.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Contoh ini juga menunjukkan cara mengakses nilai dari kamus TableRow. Nilai bilangan bulat dienkode sebagai string agar sesuai dengan format JSON yang diekspor BigQuery.

Langkah berikutnya