Menulis dari Dataflow ke BigQuery

Dokumen ini menjelaskan cara menulis data dari Dataflow ke BigQuery.

Ringkasan

Untuk sebagian besar kasus penggunaan, pertimbangkan untuk menggunakan I/O Terkelola untuk menulis ke BigQuery. I/O Terkelola menyediakan fitur seperti upgrade otomatis dan API konfigurasi yang konsisten. Saat menulis ke BigQuery, I/O Terkelola secara otomatis memilih metode penulisan terbaik untuk tugas batch atau streaming.

Jika Anda memerlukan penyesuaian performa yang lebih canggih, pertimbangkan untuk menggunakan konektor BigQueryIO. Untuk mengetahui informasi selengkapnya, lihat Menggunakan konektor BigQueryIO dalam dokumen ini.

Performa

Tabel berikut menunjukkan metrik performa untuk berbagai workload. Beban kerja ini dijalankan di satu pekerja e2-standard2, menggunakan Apache Beam SDK 2.49.0 untuk Java. Mereka tidak menggunakan Runner v2.

100 Juta data | 1 kB | 1 kolom Throughput (byte) Throughput (elemen)
Storage Write 55 MBps 54.000 elemen per detik
Avro Load 78 MBps 77.000 elemen per detik
Json Load 54 MBps 53.000 elemen per detik

Metrik ini didasarkan pada pipeline batch sederhana. Benchmark ini dimaksudkan untuk membandingkan performa antara konektor I/O, dan tidak selalu mewakili pipeline dunia nyata. Performa pipeline Dataflow bersifat kompleks, dan merupakan fungsi dari jenis VM, data yang diproses, performa sumber dan sink eksternal, serta kode pengguna. Metrik didasarkan pada menjalankan Java SDK, dan tidak mewakili karakteristik performa SDK bahasa lainnya. Untuk mengetahui informasi selengkapnya, lihat Performa IO Beam.

Menggunakan konektor BigQueryIO

Konektor I/O BigQuery mendukung metode berikut untuk menulis ke BigQuery:

  • STORAGE_WRITE_API. Dalam mode ini, konektor melakukan penulisan langsung ke penyimpanan BigQuery, menggunakan BigQuery Storage Write API. Storage Write API menggabungkan penyerapan streaming dan pemuatan batch ke dalam satu API berperforma tinggi. Mode ini menjamin semantik tepat satu kali.
  • STORAGE_API_AT_LEAST_ONCE. Mode ini juga menggunakan Storage Write API, tetapi menyediakan semantik minimal satu kali. Mode ini menghasilkan latensi yang lebih rendah untuk sebagian besar pipeline. Namun, penulisan duplikat dapat terjadi.
  • FILE_LOADS. Dalam mode ini, konektor menulis data input ke file penyiapan di Cloud Storage. Kemudian, tugas ini menjalankan tugas pemuatan BigQuery untuk memuat data ke BigQuery. Mode ini adalah default untuk PCollections yang terikat, yang paling umum ditemukan dalam pipeline batch.
  • STREAMING_INSERTS. Dalam mode ini, konektor menggunakan API streaming lama. Mode ini adalah default untuk PCollections yang tidak terikat, tetapi tidak direkomendasikan untuk project baru.

Saat memilih metode penulisan, pertimbangkan poin-poin berikut:

  • Untuk tugas streaming, pertimbangkan untuk menggunakan STORAGE_WRITE_API atau STORAGE_API_AT_LEAST_ONCE, karena mode ini menulis langsung ke penyimpanan BigQuery, tanpa menggunakan file penyiapan perantara.
  • Jika Anda menjalankan pipeline menggunakan mode streaming minimal satu kali, setel mode penulisan ke STORAGE_API_AT_LEAST_ONCE. Setelan ini lebih efisien dan sesuai dengan semantik mode streaming minimal satu kali.
  • Pemuatan file dan Storage Write API memiliki kuota dan batas yang berbeda.
  • Tugas pemuatan menggunakan gabungan slot BigQuery bersama atau slot yang dicadangkan. Untuk menggunakan slot yang direservasi, jalankan tugas pemuatan di project dengan penetapan pemesanan jenis PIPELINE. Tugas pemuatan gratis jika Anda menggunakan gabungan slot BigQuery bersama. Namun, BigQuery tidak menjamin kapasitas yang tersedia dari gabungan slot bersama. Untuk mengetahui informasi selengkapnya, lihat Pengantar reservasi.

Keparalelan

  • Untuk FILE_LOADS dan STORAGE_WRITE_API dalam pipeline streaming, konektor memecah data ke sejumlah file atau aliran. Secara umum, sebaiknya panggil withAutoSharding untuk mengaktifkan pemartisian otomatis.

  • Untuk FILE_LOADS dalam pipeline batch, konektor menulis data ke file yang dipartisi, yang kemudian dimuat ke BigQuery secara paralel.

  • Untuk STORAGE_WRITE_API di pipeline batch, setiap pekerja membuat satu atau beberapa aliran untuk menulis ke BigQuery, yang ditentukan oleh jumlah total sharding.

  • Untuk STORAGE_API_AT_LEAST_ONCE, ada satu aliran penulisan default. Beberapa pekerja menambahkan ke aliran ini.

Praktik terbaik

  • Storage Write API memiliki batas kuota. Konektor menangani batas ini untuk sebagian besar pipeline. Namun, beberapa skenario dapat menghabiskan aliran Storage Write API yang tersedia. Misalnya, masalah ini dapat terjadi di pipeline yang menggunakan sharding otomatis dan penskalaan otomatis dengan sejumlah besar tujuan, terutama dalam tugas yang berjalan lama dengan beban kerja yang sangat bervariasi. Jika masalah ini terjadi, pertimbangkan untuk menggunakan STORAGE_WRITE_API_AT_LEAST_ONCE, yang menghindari masalah tersebut.

  • Gunakan metrikGoogle Cloud untuk memantau penggunaan kuota Storage Write API Anda.

  • Saat menggunakan pemuatan file, Avro biasanya lebih unggul daripada JSON. Untuk menggunakan Avro, panggil withAvroFormatFunction.

  • Secara default, tugas pemuatan berjalan di project yang sama dengan tugas Dataflow. Untuk menentukan project lain, panggil withLoadJobProjectId.

  • Saat menggunakan Java SDK, pertimbangkan untuk membuat class yang merepresentasikan skema tabel BigQuery. Kemudian, panggil useBeamSchema di pipeline Anda untuk mengonversi secara otomatis antara jenis Row Apache Beam dan TableRow BigQuery. Untuk contoh class skema, lihat ExampleModel.java.

  • Jika Anda memuat tabel dengan skema kompleks yang berisi ribuan kolom, pertimbangkan untuk memanggil withMaxBytesPerPartition untuk menetapkan ukuran maksimum yang lebih kecil untuk setiap tugas pemuatan.

  • Secara default, BigQueryIO menggunakan setelan Storage Write API yang wajar untuk sebagian besar pipeline. Namun, jika melihat masalah performa, Anda dapat menetapkan opsi pipeline untuk menyesuaikan setelan ini. Untuk mengetahui informasi selengkapnya, lihat Menyesuaikan Storage Write API di dokumentasi Apache Beam.

Pipeline streaming

Rekomendasi berikut berlaku untuk pipeline streaming.

  • Untuk pipeline streaming, sebaiknya gunakan Storage Write API (STORAGE_WRITE_API atau STORAGE_API_AT_LEAST_ONCE).

  • Pipeline streaming dapat menggunakan pemuatan file, tetapi pendekatan ini memiliki kekurangan:

    • Hal ini memerlukan windowing untuk menulis file. Anda tidak dapat menggunakan jendela global.
    • BigQuery memuat file berdasarkan upaya terbaik saat menggunakan gabungan slot bersama. Mungkin ada penundaan yang signifikan antara saat rekaman ditulis dan saat tersedia di BigQuery.
    • Jika tugas pemuatan gagal — misalnya, karena data yang buruk atau ketidakcocokan skema — seluruh pipeline akan gagal.
  • Sebaiknya gunakan STORAGE_WRITE_API_AT_LEAST_ONCE jika memungkinkan. Hal ini dapat menyebabkan duplikat data ditulis ke BigQuery, tetapi lebih murah dan lebih skalabel daripada STORAGE_WRITE_API.

  • Secara umum, hindari penggunaan STREAMING_INSERTS. Streaming insert lebih mahal daripada Storage Write API, dan performanya tidak sebaik Storage Write API.

  • Sharding data dapat meningkatkan performa di pipeline streaming. Untuk sebagian besar pipeline, penyeimbangan otomatis adalah titik awal yang baik. Namun, Anda dapat menyesuaikan sharding sebagai berikut:

  • Jika Anda menggunakan penyisipan streaming, sebaiknya tetapkan retryTransientErrors sebagai kebijakan percobaan ulang.

Pipeline batch

Rekomendasi berikut berlaku untuk pipeline batch.

  • Untuk sebagian besar pipeline batch besar, sebaiknya coba FILE_LOADS terlebih dahulu. Pipeline batch dapat menggunakan STORAGE_WRITE_API, tetapi kemungkinan akan melebihi batas kuota dalam skala besar (lebih dari 1.000 vCPU) atau jika pipeline serentak sedang berjalan. Apache Beam tidak membatasi jumlah maksimum aliran penulisan untuk tugas batch STORAGE_WRITE_API, sehingga tugas akhirnya mencapai batas BigQuery Storage API.

  • Saat menggunakan FILE_LOADS, Anda mungkin kehabisan gabungan slot BigQuery bersama atau gabungan slot yang dicadangkan. Jika Anda mengalami kegagalan semacam ini, coba pendekatan berikut:

    • Kurangi jumlah maksimum pekerja atau ukuran pekerja untuk tugas.
    • Membeli lebih banyak slot yang dipesan.
    • Pertimbangkan untuk menggunakan STORAGE_WRITE_API.
  • Pipeline kecil hingga sedang (<1.000 vCPU) mungkin diuntungkan dengan menggunakan STORAGE_WRITE_API. Untuk tugas yang lebih kecil ini, pertimbangkan untuk menggunakan STORAGE_WRITE_API jika Anda menginginkan antrean pesan yang tidak terkirim atau saat gabungan slot bersama FILE_LOADS tidak mencukupi.

  • Jika Anda dapat mentoleransi data duplikat, pertimbangkan untuk menggunakan STORAGE_WRITE_API_AT_LEAST_ONCE. Mode ini dapat menyebabkan duplikat data ditulis ke BigQuery, tetapi mungkin lebih murah daripada opsi STORAGE_WRITE_API.

  • Mode penulisan yang berbeda mungkin berperforma berbeda berdasarkan karakteristik pipeline Anda. Lakukan percobaan untuk menemukan mode penulisan terbaik untuk beban kerja Anda.

Menangani error tingkat baris

Bagian ini menjelaskan cara menangani error yang mungkin terjadi di tingkat baris, misalnya karena data input yang salah format atau ketidakcocokan skema.

Untuk Storage Write API, baris yang tidak dapat ditulis akan ditempatkan ke dalam PCollection terpisah. Untuk mendapatkan koleksi ini, panggil getFailedStorageApiInserts pada objek WriteResult. Untuk contoh pendekatan ini, lihat Streaming data ke BigQuery.

Sebaiknya kirim error ke antrean atau tabel pesan yang tidak terkirim, untuk diproses nanti. Untuk mengetahui informasi selengkapnya tentang pola ini, lihat pola pesan yang tidak terkirim.BigQueryIO

Untuk FILE_LOADS, jika terjadi error saat memuat data, tugas pemuatan akan gagal dan pipeline akan memunculkan pengecualian runtime. Anda dapat melihat error di log Dataflow atau melihat histori tugas BigQuery. Konektor I/O tidak menampilkan informasi tentang setiap baris yang gagal.

Untuk mengetahui informasi selengkapnya tentang pemecahan masalah error, lihat Error konektor BigQuery.

Contoh

Contoh berikut menunjukkan cara menggunakan Dataflow untuk menulis ke BigQuery. Contoh ini menggunakan konektor BigQueryIO.

Menulis ke tabel yang ada

Contoh berikut membuat pipeline batch yang menulis PCollection<MyData> ke BigQuery, dengan MyData adalah jenis data kustom.

Metode BigQueryIO.write() menampilkan jenis BigQueryIO.Write<T>, yang digunakan untuk mengonfigurasi operasi penulisan. Untuk mengetahui informasi selengkapnya, lihat Menulis ke tabel dalam dokumentasi Apache Beam. Contoh kode ini menulis ke tabel yang ada (CREATE_NEVER) dan menambahkan baris baru ke tabel (WRITE_APPEND).

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.api.services.bigquery.model.TableRow;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWrite {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to an exiting BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API));
    pipeline.run().waitUntilFinish();
  }
}

Menulis ke tabel baru atau yang sudah ada

Contoh berikut membuat tabel baru jika tabel tujuan tidak ada, dengan menyetel disposisi pembuatan ke CREATE_IF_NEEDED. Saat menggunakan opsi ini, Anda harus memberikan skema tabel. Konektor menggunakan skema ini jika membuat tabel baru.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;

public class BigQueryWriteWithSchema {
  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    public MyData() {}

    public MyData(String name, Long age) {
      this.name = name;
      this.age = age;
    }
  }

  public static void main(String[] args) {
    // Example source data.
    final List<MyData> data = Arrays.asList(
        new MyData("Alice", 40L),
        new MyData("Bob", 30L),
        new MyData("Charlie", 20L)
    );

    // Define a table schema. A schema is required for write disposition CREATE_IF_NEEDED.
    TableSchema schema = new TableSchema()
        .setFields(
            Arrays.asList(
                new TableFieldSchema()
                    .setName("user_name")
                    .setType("STRING")
                    .setMode("REQUIRED"),
                new TableFieldSchema()
                    .setName("age")
                    .setType("INT64") // Defaults to NULLABLE
            )
        );

    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Create an in-memory PCollection of MyData objects.
        .apply(Create.of(data))
        // Write the data to a new or existing BigQuery table.
        .apply(BigQueryIO.<MyData>write()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withFormatFunction(
                (MyData x) -> new TableRow().set("user_name", x.name).set("age", x.age))
            .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
            .withSchema(schema)
            .withMethod(Write.Method.STORAGE_WRITE_API)
        );
    pipeline.run().waitUntilFinish();
  }
}

Melakukan streaming data ke BigQuery

Contoh berikut menunjukkan cara melakukan streaming data menggunakan semantik tepat satu kali, dengan menetapkan mode penulisan ke STORAGE_WRITE_API

Tidak semua pipeline streaming memerlukan semantik tepat satu kali. Misalnya, Anda mungkin dapat menghapus duplikat secara manual dari tabel tujuan. Jika kemungkinan data duplikat dapat diterima untuk skenario Anda, pertimbangkan untuk menggunakan semantik setidaknya sekali dengan menyetel metode penulisan ke STORAGE_API_AT_LEAST_ONCE. Metode ini umumnya lebih efisien dan menghasilkan latensi yang lebih rendah untuk sebagian besar pipeline.

Java

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, lihat Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class BigQueryStreamExactlyOnce {
  // Create a PTransform that sends simulated streaming data. In a real application, the data
  // source would be an external source, such as Pub/Sub.
  private static TestStream<String> createEventSource() {
    Instant startTime = new Instant(0);
    return TestStream.create(StringUtf8Coder.of())
        .advanceWatermarkTo(startTime)
        .addElements(
            TimestampedValue.of("Alice,20", startTime),
            TimestampedValue.of("Bob,30",
                startTime.plus(Duration.standardSeconds(1))),
            TimestampedValue.of("Charles,40",
                startTime.plus(Duration.standardSeconds(2))),
            TimestampedValue.of("Dylan,Invalid value",
                startTime.plus(Duration.standardSeconds(2))))
        .advanceWatermarkToInfinity();
  }

  public static PipelineResult main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);
    options.setStreaming(true);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Add a streaming data source.
        .apply(createEventSource())
        // Map the event data into TableRow objects.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            .via((String x) -> {
              String[] columns = x.split(",");
              return new TableRow().set("user_name", columns[0]).set("age", columns[1]);
            }))
        // Write the rows to BigQuery
        .apply(BigQueryIO.writeTableRows()
            .to(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withMethod(Write.Method.STORAGE_WRITE_API)
            // For exactly-once processing, set the triggering frequency.
            .withTriggeringFrequency(Duration.standardSeconds(5)))
        // Get the collection of write errors.
        .getFailedStorageApiInserts()
        .apply(MapElements.into(TypeDescriptors.strings())
            // Process each error. In production systems, it's useful to write the errors to
            // another destination, such as a dead-letter table or queue.
            .via(
                x -> {
                  System.out.println("Failed insert: " + x.getErrorMessage());
                  System.out.println("Row: " + x.getRow());
                  return "";
                }));
    return pipeline.run();
  }
}

Langkah berikutnya