Daten als Protobuf-Spalten exportieren

In diesem Dokument wird beschrieben, wie Sie BigQuery-Daten als Protokollpufferspalten (Protobuf) mit benutzerdefinierten BigQuery-Funktionen (UDFs) exportieren können.

Wann werden Protobuf-Spalten verwendet?

BigQuery bietet eine Reihe von integrierten Funktionen zum Formatieren ausgewählter Daten. Eine Möglichkeit besteht darin, mehrere Spaltenwerte in einem einzigen Protobuf-Wert zusammenzuführen. Dies hat folgende Vorteile:

  • Sicherheit des Objekttyps.
  • Verbesserte Komprimierung, Datenübertragungszeit und Kosten im Vergleich zu JSON.
  • Flexibilität, da die meisten Programmiersprachen Bibliotheken für die Verarbeitung von Protobuf haben.
  • Weniger Aufwand beim Lesen aus mehreren Spalten und Erstellen eines einzelnen Objekts.

Andere Spaltentypen können auch Typensicherheit bieten. Bei Verwendung von Protobuf-Spalten steht jedoch ein vollständig typisiertes Objekt zur Verfügung, wodurch der Arbeitsaufwand für die Anwendungsebene oder einen anderen Teil der Pipeline reduziert werden kann.

Es gibt jedoch Einschränkungen beim Exportieren von BigQuery-Daten als Protobuf-Spalten:

  • Protobuf-Spalten sind nicht gut indexiert oder gefiltert. Die Suche nach dem Inhalt der Protobuf-Spalten kann weniger effektiv sein.
  • Das Sortieren von Daten im Protobuf-Format kann schwierig sein.

Wenn diese Einschränkungen für den Exportworkflow gelten, können Sie andere Methoden zum Exportieren von BigQuery-Daten in Betracht ziehen:

  • Verwenden Sie geplante Abfragen mit EXPORT DATA-Anweisungen, um die exportierten BigQuery-Daten nach Datum oder Uhrzeit zu sortieren und Exporte regelmäßig zu planen. BigQuery unterstützt den Export von Daten in Avro-, CSV-, JSON- und Parquet-Formate.
  • Mit Dataflow können Sie BigQuery-Daten entweder im Avro- oder CSV-Dateiformat exportieren.

Erforderliche Rollen

Bitten Sie Ihren Administrator, Ihnen die folgenden IAM-Rollen für Ihr Projekt zuzuweisen, um die Berechtigungen zu erhalten, die Sie zum Exportieren von BigQuery-Daten als Protobuf-Spalten benötigen:

Weitere Informationen zum Zuweisen von Rollen finden Sie unter Zugriff auf Projekte, Ordner und Organisationen verwalten.

Sie können die erforderlichen Berechtigungen auch über benutzerdefinierte Rollen oder andere vordefinierte Rollen erhalten.

UDF erstellen

Erstellen Sie eine UDF, die einen BigQuery-Datentyp STRUCT in eine Protobuf-Spalte konvertiert:

  1. Klonen Sie in einer Befehlszeile das Repository bigquery-utils.git:

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Rufen Sie den Protobuf-Exportordner auf:

    cd bigquery-utils/tools/protobuf_export
    
  3. Verwenden Sie den Befehl cp oder den Dateibrowser Ihres Betriebssystems, um die Proto-Datei in den untergeordneten Ordner ./protos zu kopieren.

    Im Ordner ./protos befindet sich bereits eine Prototypdatei mit dem Namen dummy.proto.

  4. Installieren Sie die erforderlichen Pakete aus dem GitHub-Repository:

    npm install
    
  5. Verpacken Sie das Paket mit Webpack:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. Suchen Sie im untergeordneten Ordner ./dist nach der Datei pbwrapper.js und laden Sie die Datei in einen Cloud Storage-Bucket hoch.

  7. Rufen Sie die Seite BigQuery auf.

    BigQuery aufrufen

  8. Erstellen Sie mit dem Abfrageeditor eine UDF mit dem Namen toMyProtoMessage, die aus vorhandenen BigQuery-Tabellenspalten eine Protobuf-Spalte erstellt:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<INPUT_FIELDS>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Ersetzen Sie Folgendes:

    • DATASET_ID: die ID des Datasets, das die UDF enthalten soll.
    • INPUT_FIELDS: die Felder, die im Proto-Nachrichtentyp für die Proto-Datei verwendet werden, im Format field_name_1 field_type_1 [, field_name_2 field_type_2, ...].

      Sie müssen alle Felder für Nachrichtentypen, in denen Unterstriche verwendet werden, in Kamelschrift umwandeln. Wenn der Nachrichtentyp beispielsweise so aussieht, muss der Wert des Eingabefelds itemId int64, itemDescription string sein:

      message ThisMessage {
        int64 item_id = 1;
        string item_description = 2;
      }
      
    • BUCKET_NAME: der Name des Cloud Storage-Buckets, der die Datei pbwrapper.js enthält.

    • PROTO_PACKAGE: das Paket für die Proto-Datei.

    • PROTO_MESSAGE: der Nachrichtentyp für die Proto-Datei.

    Wenn Sie beispielsweise die bereitgestellte dummy.proto-Datei verwenden, sieht die CREATE FUNCTION-Anweisung so aus:

    CREATE OR REPLACE FUNCTION
      mydataset.toMyProtoMessage(input STRUCT<dummyField STRING>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://mybucket/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("dummypackage.DummyMessage")
    return pbwrapper.parse(message, input)
      """;
    

Spalten als Protobuf-Werte formatieren

Führen Sie die toMyProtoMessage-UDF aus, um BigQuery-Tabellenspalten als Protobuf-Werte zu formatieren:

  SELECT
    UDF_DATASET_ID.toMyProtoMessage(STRUCT(INPUT_COLUMNS)) AS protoResult
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_NAME`
  LIMIT
    100;

Ersetzen Sie Folgendes:

  • UDF_DATASET_ID: die ID des Datasets, das die UDF enthält.
  • INPUT_COLUMNS: Die Namen der Spalten, die als Protobuf-Wert formatiert werden sollen, im Format column_name_1 [, column_name_2, ...]. Spalten können alle unterstützten skalaren Werttypen oder nicht skalaren Typen enthalten, einschließlich ARRAY und STRUCT. Die Eingabespalten müssen dem Typ und der Anzahl der Felder des Proto-Nachrichtentyps entsprechen.
  • PROJECT_ID: die ID des Projekts, das die Tabelle enthält. Sie können die Projektidentifikation überspringen, wenn sich das Dataset in Ihrem aktuellen Projekt befindet.
  • DATASET_ID: die ID des Datasets, das die Tabelle enthält.
  • TABLE_NAME: Der Name der Tabelle, die die zu formatierenden Spalten enthält.

Wenn Sie beispielsweise eine toMyProtoMessage-UDF auf der Grundlage von dummy.proto verwenden, funktioniert die folgende SELECT-Anweisung:

SELECT
  mydataset.toMyProtoMessage(STRUCT(word)) AS protoResult
FROM
  `bigquery-public-data.samples.shakespeare`
LIMIT 100;

Mit Protobuf-Werten arbeiten

Wenn Ihre BigQuery-Daten im Protobuf-Format exportiert wurden, können Sie jetzt mit den Daten als vollständig typisiertes Objekt oder Struct arbeiten.

Die folgenden Codebeispiele enthalten mehrere Beispiele für die Verarbeitung oder Verwendung der exportierten Daten:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )