Esportare i dati come colonne Protobuf

Questo documento descrive come esportare i dati BigQuery come colonne Protocol Buffers (Protobuf) utilizzando le funzioni definite dall'utente (UDF) di BigQuery.

Quando utilizzare le colonne Protobuf

BigQuery offre una serie di funzioni integrate per formattare i dati selezionati. Un'opzione è unire più valori di colonna in un unico valore Protobuf, il che offre i seguenti vantaggi:

  • Sicurezza dei tipi di oggetti.
  • Compressione, tempi di trasferimento dei dati e costi migliorati rispetto a JSON.
  • Flessibilità, in quanto la maggior parte dei linguaggi di programmazione dispone di librerie per la gestione di Protobuf.
  • Minore overhead durante la lettura da più colonne e la creazione di un singolo oggetto.

Sebbene anche altri tipi di colonne possano fornire la sicurezza dei tipi, l'utilizzo delle colonne Protobuf fornisce un oggetto completamente tipizzato, il che può ridurre la quantità di lavoro da svolgere nel livello applicazione o in un'altra parte della pipeline.

Tuttavia, esistono limitazioni all'esportazione dei dati BigQuery come colonne Protobuf:

  • Le colonne Protobuf non sono ben indicizzate o filtrate. La ricerca in base al contenuto delle colonne Protobuf può essere meno efficace.
  • Ordinare i dati in formato Protobuf può essere difficile.

Se queste limitazioni si applicano al flusso di lavoro di esportazione, puoi prendere in considerazione altri metodi di esportazione dei dati BigQuery:

  • Utilizza le query pianificate con le istruzioni EXPORT DATA per ordinare i dati BigQuery esportati per data o ora e per pianificare le esportazioni su base ricorrente. BigQuery supporta l'esportazione dei dati nei formati Avro, CSV, JSON e Parquet.
  • Utilizza Dataflow per esportare i dati BigQuery in formato Avro o CSV.

Ruoli obbligatori

Per ottenere le autorizzazioni necessarie per esportare i dati BigQuery come colonne Protobuf, chiedi all'amministratore di concederti i seguenti ruoli IAM nel progetto:

Per saperne di più sulla concessione dei ruoli, consulta Gestisci l'accesso a progetti, cartelle e organizzazioni.

Potresti anche riuscire a ottenere le autorizzazioni richieste tramite i ruoli personalizzati o altri ruoli predefiniti.

Crea una funzione definita dall'utente

Crea una funzione definita dall'utente che converte un tipo di dati STRUCT BigQuery in una colonna Protobuf:

  1. In una riga di comando, clona il repository bigquery-utils.git:

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Vai alla cartella di esportazione di Protobuf:

    cd bigquery-utils/tools/protobuf_export
    
  3. Utilizza il comando cp o il browser di file del sistema operativo per copiare il file proto nella cartella secondaria ./protos.

    Nella cartella ./protos è già presente un file proto di esempio denominato dummy.proto.

  4. Installa i pacchetti necessari dal repository GitHub:

    npm install
    
  5. Raggruppa il pacchetto utilizzando webpack:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. Individua il file pbwrapper.js nella cartella secondaria ./dist, quindi carica il file in un bucket Cloud Storage.

  7. Vai alla pagina BigQuery.

    Vai a BigQuery

  8. Utilizzando l'editor di query, crea una UDF denominata toMyProtoMessage che crea una colonna Protobuf dalle colonne della tabella BigQuery esistenti:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<INPUT_FIELDS>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Sostituisci quanto segue:

    • DATASET_ID: l'ID del set di dati che deve contenere la UDF.
    • INPUT_FIELDS: i campi utilizzati nel tipo di messaggio proto per il file proto, nel formato field_name_1 field_type_1 [, field_name_2 field_type_2, ...].

      Devi tradurre tutti i campi del tipo di messaggio che utilizzano i trattini bassi in modo che utilizzino camel case. Ad esempio, se il tipo di messaggio è simile al seguente, il valore dei campi di input deve essere itemId int64, itemDescription string:

      message ThisMessage {
        int64 item_id = 1;
        string item_description = 2;
      }
      
    • BUCKET_NAME: il nome del bucket Cloud Storage che contiene il file pbwrapper.js.

    • PROTO_PACKAGE: il pacchetto per il file proto.

    • PROTO_MESSAGE: il tipo di messaggio per il file proto.

    Ad esempio, se utilizzi il file dummy.proto fornito, l'istruzione CREATE FUNCTION ha il seguente aspetto:

    CREATE OR REPLACE FUNCTION
      mydataset.toMyProtoMessage(input STRUCT<dummyField STRING>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://mybucket/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("dummypackage.DummyMessage")
    return pbwrapper.parse(message, input)
      """;
    

Formattare le colonne come valori Protobuf

Esegui la funzione definita dall'utente toMyProtoMessage per formattare le colonne della tabella BigQuery come valori Protobuf:

  SELECT
    UDF_DATASET_ID.toMyProtoMessage(STRUCT(INPUT_COLUMNS)) AS protoResult
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_NAME`
  LIMIT
    100;

Sostituisci quanto segue:

  • UDF_DATASET_ID: l'ID del set di dati che contiene la UDF.
  • INPUT_COLUMNS: i nomi delle colonne da formattare come valore Protobuf, nel formato column_name_1 [, column_name_2, ...]. Le colonne possono essere di qualsiasi tipo di valore scalare o di tipo non scalare, inclusi ARRAY e STRUCT. Le colonne di input devono corrispondere al tipo e al numero di campi del tipo di messaggio proto.
  • PROJECT_ID: l'ID del progetto che contiene la tabella. Puoi saltare l'identificazione del progetto se il set di dati si trova nel progetto attuale.
  • DATASET_ID: l'ID del set di dati che contiene la tabella.
  • TABLE_NAME: il nome della tabella che contiene le colonne da formattare.

Ad esempio, se utilizzi una UDF toMyProtoMessage basata su dummy.proto, la seguente istruzione SELECT funziona:

SELECT
  mydataset.toMyProtoMessage(STRUCT(word)) AS protoResult
FROM
  `bigquery-public-data.samples.shakespeare`
LIMIT 100;

Utilizzare i valori Protobuf

Con i dati BigQuery esportati in formato Protobuf, ora puoi utilizzarli come oggetto o struct completamente tipizzato.

I seguenti esempi di codice forniscono diversi modi per elaborare o utilizzare i dati esportati:

Vai

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )