將資料匯出為 Protobuf 欄

本文件說明如何使用 BigQuery 使用者定義的函式 (UDF),將 BigQuery 資料匯出為 Protocol Buffers (Protobuf) 欄。

使用 Protobuf 欄的時間

BigQuery 提供多個內建函式,可用於格式化所選資料。其中一個選項是將多個欄值合併為單一 Protobuf 值,這樣做有以下好處:

  • 物件類型安全性。
  • 相較於 JSON,壓縮、資料傳輸時間和成本皆有所改善。
  • 靈活性,因為大多數程式設計語言都有處理 Protobuf 的程式庫。
  • 從多個資料欄讀取資料並建構單一物件時,可減少額外負擔。

雖然其他欄型別也可以提供型別安全性,但使用 Protobuf 欄可提供完整型別的物件,進而減少在應用程式層或管道其他部分所需的工作量。

不過,匯出 BigQuery 資料做為 Protobuf 欄位時,有以下限制:

  • Protobuf 欄未妥善建立索引或篩選。以 Protobuf 欄內容為依據進行搜尋可能效率較低。
  • 以 Protobuf 格式排序資料可能會很困難。

如果匯出工作流程受到這些限制的影響,您可以考慮使用其他方法匯出 BigQuery 資料:

  • 請使用排程查詢EXPORT DATA 陳述式,依日期或時間排序匯出的 BigQuery 資料,並排定定期匯出作業。BigQuery 支援將資料匯出為 Avro、CSV、JSON 和 Parquet 格式。
  • 使用 Dataflow,以 Avro 或 CSV 檔案格式匯出 BigQuery 資料。

必要的角色

如要取得匯出 BigQuery 資料為 Protobuf 欄所需的權限,請要求管理員為您授予專案的下列 IAM 角色:

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

您或許還可透過自訂角色或其他預先定義的角色取得必要權限。

建立 UDF

建立 UDF,將 BigQuery STRUCT 資料類型轉換為 Protobuf 欄:

  1. 在指令列中複製 bigquery-utils.git 存放區:

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. 前往 Protobuf 匯出資料夾:

    cd bigquery-utils/tools/protobuf_export
    
  3. 使用 cp 指令或作業系統的檔案瀏覽器,將 Proto 檔案複製到 ./protos 子資料夾。

    ./protos 資料夾中已有名為 dummy.proto 的範例 Proto 檔案。

  4. 從 GitHub 存放區安裝必要套件:

    npm install
    
  5. 使用 webpack 將套件打包:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. 找出 ./dist 子資料夾中的 pbwrapper.js 檔案,然後將檔案上傳至 Cloud Storage 值區

  7. 前往「BigQuery」頁面

    前往 BigQuery

  8. 使用查詢編輯器建立名為 toMyProtoMessage 的 UDF,以便從現有的 BigQuery 資料欄建立 Protobuf 欄:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<INPUT_FIELDS>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    更改下列內容:

    • DATASET_ID:包含 UDF 的資料集 ID。
    • INPUT_FIELDS:在 proto 檔案的 proto 訊息類型中使用的欄位,格式為 field_name_1 field_type_1 [, field_name_2 field_type_2, ...]

      您必須將使用底線的任何訊息類型欄位轉譯為駝峰式大寫。舉例來說,如果訊息類型如下所示,則輸入欄位值必須為 itemId int64, itemDescription string

      message ThisMessage {
        int64 item_id = 1;
        string item_description = 2;
      }
      
    • BUCKET_NAME:包含 pbwrapper.js 檔案的 Cloud Storage 值區名稱。

    • PROTO_PACKAGE:proto 檔案的package

    • PROTO_MESSAGE:proto 檔案的訊息類型。

    舉例來說,如果您使用提供的 dummy.proto 檔案,CREATE FUNCTION 陳述式會如下所示:

    CREATE OR REPLACE FUNCTION
      mydataset.toMyProtoMessage(input STRUCT<dummyField STRING>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://mybucket/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("dummypackage.DummyMessage")
    return pbwrapper.parse(message, input)
      """;
    

將資料欄格式化為 Protobuf 值

執行 toMyProtoMessage UDF,將 BigQuery 資料表欄格式化為 Protobuf 值:

  SELECT
    UDF_DATASET_ID.toMyProtoMessage(STRUCT(INPUT_COLUMNS)) AS protoResult
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_NAME`
  LIMIT
    100;

更改下列內容:

  • UDF_DATASET_ID:包含 UDF 的資料集 ID。
  • INPUT_COLUMNS:要以 column_name_1 [, column_name_2, ...] 格式轉換為 Protobuf 值的資料欄名稱。資料欄可以是任何支援的標量值類型或非標量類型,包括 ARRAYSTRUCT。輸入資料欄必須符合 Proto 訊息類型欄位的類型和數量。
  • PROJECT_ID:包含表格的專案 ID。如果資料集位於目前專案中,您可以略過識別專案的步驟。
  • DATASET_ID:包含資料表的資料集 ID。
  • TABLE_NAME:包含要格式化的資料欄的表格名稱。

舉例來說,如果您使用的是基於 dummy.prototoMyProtoMessage UDF,則下列 SELECT 陳述式可正常運作:

SELECT
  mydataset.toMyProtoMessage(STRUCT(word)) AS protoResult
FROM
  `bigquery-public-data.samples.shakespeare`
LIMIT 100;

使用 Protobuf 值

由於 BigQuery 資料是以 Protobuf 格式匯出,您現在可以將資料視為完整型別的物件或結構體使用。

以下程式碼範例提供幾種處理或使用匯出的資料的方式:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )