Exportar dados como colunas Protobuf

Neste documento, descrevemos como exportar dados do BigQuery como colunas de buffers de protocolo (Protobuf) usando funções definidas pelo usuário (UDFs, na sigla em inglês) do BigQuery.

Quando usar as colunas do Protobuf

O BigQuery oferece várias funções integradas para formatar os dados selecionados. Uma opção é mesclar vários valores de coluna em um único valor de Protobuf, o que tem os seguintes benefícios:

  • Segurança de tipo de objeto
  • Compactação, tempo de transferência de dados e custo aprimorados em comparação com o JSON.
  • Flexibilidade, já que a maioria das linguagens de programação tem bibliotecas para lidar com Protobuf.
  • Menos sobrecarga ao ler de várias colunas e criar um único objeto.

Enquanto outros tipos de coluna também podem fornecer segurança de tipo, o uso de colunas Protobuf fornece um objeto totalmente tipado, o que pode reduzir a quantidade de trabalho que precisa ser feita na camada do aplicativo ou em outra parte do pipeline.

No entanto, há limitações para exportar dados do BigQuery como colunas Protobuf:

  • As colunas protobuf não são bem indexadas ou filtradas. Pesquisar pelo conteúdo das colunas do Protobuf pode ser menos eficaz.
  • Classificar dados no formato Protobuf pode ser difícil.

Se essas limitações se aplicarem ao fluxo de trabalho de exportação, considere outros métodos de exportação de dados do BigQuery:

  • Use consultas programadas com instruções EXPORT DATA para classificar os dados exportados do BigQuery por data ou hora e programar exportações de maneira recorrente. O BigQuery oferece suporte à exportação de dados nos formatos Avro, CSV, JSON e Parquet.
  • Use o Dataflow para exportar dados do BigQuery nos formatos de arquivo Avro ou CSV.

Funções exigidas

Para conseguir as permissões necessárias para exportar dados do BigQuery como colunas do Protobuf, peça ao administrador para conceder a você os seguintes papéis do IAM no seu projeto:

Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Criar uma UDF

Crie uma UDF que converta um tipo de dados STRUCT do BigQuery em uma coluna Protobuf:

  1. Em uma linha de comando, clone o repositório bigquery-utils.git:

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Navegue até a pasta de exportação do Protobuf:

    cd bigquery-utils/tools/protobuf_export
    
  3. Use o comando cp ou o navegador de arquivos do sistema operacional para copiar o arquivo proto para a pasta filha ./protos.

    Já há um arquivo de exemplo de proto chamado dummy.proto na pasta ./protos.

  4. Instale os pacotes necessários no repositório do GitHub:

    npm install
    
  5. Agrupe o pacote usando o webpack:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. Localize o arquivo pbwrapper.js na pasta filha ./dist e, em seguida, faça upload do arquivo para um bucket do Cloud Storage.

  7. Acessar a página do BigQuery.

    Ir para o BigQuery

  8. Usando o editor de consultas, crie uma UDF chamada toMyProtoMessage que crie uma coluna Protobuf com base nas colunas da tabela do BigQuery:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<INPUT_FIELDS>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Substitua:

    • DATASET_ID: o ID do conjunto de dados que vai conter a UDF.
    • INPUT_FIELDS: os campos usados no tipo de mensagem do proto para o arquivo proto, no formato field_name_1 field_type_1 [, field_name_2 field_type_2, ...].

      É necessário traduzir todos os campos de tipo de mensagem que usam sublinhados para usar camel case. Por exemplo, se o tipo de mensagem for semelhante ao seguinte, o valor dos campos de entrada precisará ser itemId int64, itemDescription string:

      message ThisMessage {
        int64 item_id = 1;
        string item_description = 2;
      }
      
    • BUCKET_NAME: o nome do bucket do Cloud Storage que contém o arquivo pbwrapper.js.

    • PROTO_PACKAGE: o pacote do arquivo proto.

    • PROTO_MESSAGE: o tipo de mensagem do arquivo proto.

    Por exemplo, se você usar o arquivo dummy.proto fornecido, a instrução CREATE FUNCTION vai ficar assim:

    CREATE OR REPLACE FUNCTION
      mydataset.toMyProtoMessage(input STRUCT<dummyField STRING>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://mybucket/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("dummypackage.DummyMessage")
    return pbwrapper.parse(message, input)
      """;
    

Formatar colunas como valores Protobuf

Execute a UDF toMyProtoMessage para formatar as colunas de tabela do BigQuery como valores Protobuf:

  SELECT
    UDF_DATASET_ID.toMyProtoMessage(STRUCT(INPUT_COLUMNS)) AS protoResult
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_NAME`
  LIMIT
    100;

Substitua:

  • UDF_DATASET_ID: o ID do conjunto de dados que contém a UDF.
  • INPUT_COLUMNS: os nomes das colunas a serem formatadas como um valor Protobuf, no formato column_name_1 [, column_name_2, ...]. As colunas podem ser de qualquer tipo de valor escalar ou não escalar compatível, incluindo ARRAY e STRUCT. As colunas de entrada precisam corresponder ao tipo e ao número de campos de tipo de mensagem proto.
  • PROJECT_ID: o ID do projeto que contém a tabela. Você pode pular a identificação do projeto se o conjunto de dados estiver no projeto atual.
  • DATASET_ID: o ID do conjunto de dados que contém a tabela.
  • TABLE_NAME: o nome da tabela que contém as colunas a serem formatadas.

Por exemplo, se você usar uma UDF toMyProtoMessage baseada em dummy.proto, a instrução SELECT a seguir vai funcionar:

SELECT
  mydataset.toMyProtoMessage(STRUCT(word)) AS protoResult
FROM
  `bigquery-public-data.samples.shakespeare`
LIMIT 100;

Trabalhar com valores Protobuf

Com os dados do BigQuery exportados no formato Protobuf, agora é possível trabalhar com os dados como um objeto ou estrutura totalmente tipado.

Os exemplos de código a seguir fornecem vários exemplos de maneiras de processar ou trabalhar com os dados exportados:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )