Mengekspor data sebagai kolom Protobuf

Dokumen ini menjelaskan cara mengekspor data BigQuery sebagai kolom Buffering Protokol (Protobuf) menggunakan fungsi yang ditentukan pengguna (UDF) BigQuery.

Kapan kolom Protobuf digunakan

BigQuery menawarkan sejumlah fungsi bawaan untuk memformat data yang dipilih. Salah satu opsinya adalah menggabungkan beberapa nilai kolom menjadi satu nilai Protobuf, yang memiliki manfaat berikut:

  • Keamanan jenis objek.
  • Kompresi, waktu transfer data, dan biaya yang lebih baik dibandingkan dengan JSON.
  • Fleksibilitas, karena sebagian besar bahasa pemrograman memiliki library untuk menangani Protobuf.
  • Lebih sedikit overhead saat membaca dari beberapa kolom dan membangun satu objek.

Meskipun jenis kolom lainnya juga dapat memberikan keamanan jenis, penggunaan kolom Protobuf akan menyediakan objek fully-typed, yang dapat mengurangi jumlah pekerjaan yang perlu dilakukan pada lapisan aplikasi atau di bagian lain dari pipeline.

Namun, ada batasan untuk mengekspor data BigQuery sebagai kolom Protobuf:

  • Kolom protobuf tidak diindeks atau difilter dengan baik. Mencari berdasarkan isi kolom Protobuf bisa menjadi kurang efektif.
  • Mengurutkan data dalam format Protobuf bisa jadi sulit.

Jika batasan ini berlaku untuk alur kerja ekspor, Anda dapat mempertimbangkan metode lain untuk mengekspor data BigQuery:

  • Gunakan kueri terjadwal dengan pernyataan EXPORT DATA untuk mengurutkan data BigQuery yang diekspor menurut tanggal atau waktu, dan untuk menjadwalkan ekspor secara berulang. BigQuery mendukung pengeksporan data ke dalam format Avro, CSV, JSON, dan Parquet.
  • Gunakan Dataflow untuk mengekspor data BigQuery dalam format file Avro atau CSV.

Peran yang diperlukan

Untuk mendapatkan izin yang Anda perlukan untuk mengekspor data BigQuery sebagai kolom Protobuf, minta administrator Anda untuk memberi Anda peran IAM berikut di project Anda:

Untuk mengetahui informasi selengkapnya tentang cara memberikan peran, lihat Mengelola akses ke project, folder, dan organisasi.

Anda mungkin juga bisa mendapatkan izin yang diperlukan melalui peran khusus atau peran bawaan lainnya.

Membuat UDF

Buat UDF yang mengonversi jenis data STRUCT BigQuery menjadi kolom Protobuf:

  1. Pada command line, clone repositori bigquery-utils.git:

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Buka folder ekspor Protobuf:

    cd bigquery-utils/tools/protobuf_export
    
  3. Gunakan perintah cp atau browser file sistem operasi Anda untuk menyalin file proto ke folder turunan ./protos.

    Sudah ada file proto contoh bernama dummy.proto di folder ./protos.

  4. Instal paket yang diperlukan dari repositori GitHub:

    npm install
    
  5. Paketkan paket menggunakan webpack:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. Temukan file pbwrapper.js di folder turunan ./dist, lalu upload file tersebut ke bucket Cloud Storage.

  7. Buka halaman BigQuery.

    Buka BigQuery

  8. Dengan menggunakan editor kueri, buat UDF bernama toMyProtoMessage yang membangun kolom Protobuf dari kolom tabel BigQuery yang ada:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<INPUT_FIELDS>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Ganti kode berikut:

    • DATASET_ID: ID set data yang akan berisi UDF.
    • INPUT_FIELDS: kolom yang digunakan dalam jenis pesan proto untuk file proto, dalam format field_name_1 field_type_1 [, field_name_2 field_type_2, ...].

      Anda harus menerjemahkan semua kolom jenis pesan yang menggunakan garis bawah menjadi camel case. Misalnya, jika jenis pesan terlihat seperti berikut, maka nilai kolom input harus itemId int64, itemDescription string:

      message ThisMessage {
        int64 item_id = 1;
        string item_description = 2;
      }
      
    • BUCKET_NAME: nama bucket Cloud Storage yang berisi file pbwrapper.js.

    • PROTO_PACKAGE: paket untuk file proto.

    • PROTO_MESSAGE: jenis pesan untuk file proto.

    Misalnya, jika Anda menggunakan file dummy.proto yang disediakan, pernyataan CREATE FUNCTION akan terlihat seperti berikut:

    CREATE OR REPLACE FUNCTION
      mydataset.toMyProtoMessage(input STRUCT<dummyField STRING>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://mybucket/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("dummypackage.DummyMessage")
    return pbwrapper.parse(message, input)
      """;
    

Memformat kolom sebagai nilai Protobuf

Jalankan UDF toMyProtoMessage untuk memformat kolom tabel BigQuery sebagai nilai Protobuf:

  SELECT
    UDF_DATASET_ID.toMyProtoMessage(STRUCT(INPUT_COLUMNS)) AS protoResult
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_NAME`
  LIMIT
    100;

Ganti kode berikut:

  • UDF_DATASET_ID: ID set data yang berisi UDF.
  • INPUT_COLUMNS: nama kolom yang akan diformat sebagai nilai Protobuf, dalam format column_name_1 [, column_name_2, ...]. Kolom dapat berupa jenis nilai skalar yang didukung atau jenis non-skalar, termasuk ARRAY dan STRUCT. Kolom input harus cocok dengan jenis dan jumlah kolom jenis pesan proto.
  • PROJECT_ID: ID project yang berisi tabel. Anda dapat melewati identifikasi project jika set data berada di project saat ini.
  • DATASET_ID: ID set data yang berisi tabel.
  • TABLE_NAME: nama tabel yang berisi kolom yang akan diformat.

Misalnya, jika Anda menggunakan UDF toMyProtoMessage berdasarkan dummy.proto, pernyataan SELECT berikut akan berfungsi:

SELECT
  mydataset.toMyProtoMessage(STRUCT(word)) AS protoResult
FROM
  `bigquery-public-data.samples.shakespeare`
LIMIT 100;

Menggunakan nilai Protobuf

Dengan data BigQuery yang diekspor dalam format Protobuf, Anda kini dapat menggunakan data sebagai objek atau struct yang memiliki jenis lengkap.

Contoh kode berikut memberikan beberapa contoh cara untuk memproses atau menggunakan data yang diekspor:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )