Exporta datos como columnas de Protobuf

En este documento, se describe cómo puedes exportar datos de BigQuery como columnas de búferes de protocolo (Protobuf) mediante funciones definidas por el usuario (UDF) de BigQuery.

Cuándo usar columnas de Protobuf

BigQuery ofrece una variedad de funciones integradas para dar formato a los datos seleccionados. Una opción es combinar varios valores de columna en un solo valor de Protobuf, lo que brinda los siguientes beneficios:

  • Seguridad de tipo de objeto
  • Compresión mejorada, tiempo de transferencia de datos y costo en comparación con JSON
  • Flexibilidad, ya que la mayoría de los lenguajes de programación tienen bibliotecas para manejar Protobuf
  • Menos sobrecarga cuando se lee desde varias columnas y se compila un solo objeto

Si bien otros tipos de columna también pueden proporcionar seguridad de tipo, el uso de columnas de Protobuf proporciona un objeto de tipo completo, lo que puede reducir la cantidad de trabajo que se debe realizar en la capa de la aplicación o en otra parte de la canalización.

Sin embargo, existen limitaciones para exportar datos de BigQuery como columnas de Protobuf:

  • Las columnas de Protobuf no están bien indexadas ni filtradas. La búsqueda mediante el contenido de las columnas de Protobuf puede ser menos eficaz.
  • Ordenar datos en formato Protobuf puede ser difícil.

Si estas limitaciones se aplican al flujo de trabajo de exportación, puedes considerar otros métodos para exportar datos de BigQuery:

  • Usa las consultas programadas con sentencias EXPORT DATA para ordenar los datos exportados de BigQuery por fecha o hora, y programar las exportaciones de forma recurrente. BigQuery es compatible con la exportación de datos a los formatos Avro, CSV, JSON y Parquet.
  • Usa Dataflow para exportar datos de BigQuery en formato de archivo Avro o CSV.

Roles obligatorios

Para obtener los permisos que necesitas para exportar los datos de BigQuery como columnas de Protobuf, pídele a tu administrador que te otorgue los siguientes roles de IAM en tu proyecto:

Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Crea una UDF

Crea una UDF que convierta un tipo de datos STRUCT de BigQuery en una columna de Protobuf:

  1. En una línea de comandos, clona el repositorio bigquery-utils.git.

    git clone https://github.com/GoogleCloudPlatform/bigquery-utils.git
    
  2. Navega a la carpeta de exportación de Protobuf:

    cd bigquery-utils/tools/protobuf_export
    
  3. Usa el comando cp o el navegador de archivos de tu sistema operativo para copiar el archivo proto a la carpeta secundaria ./protos.

    Ya hay un archivo proto de muestra llamado dummy.proto en la carpeta ./protos.

  4. Instala los paquetes necesarios desde el repositorio de GitHub:

    npm install
    
  5. Empaqueta el paquete mediante webpack:

    npx webpack --config webpack.config.js --stats-error-details
    
  6. Busca el archivo pbwrapper.js en la carpeta secundaria ./dist y, luego, súbelo a un bucket de Cloud Storage.

  7. Ve a la página de BigQuery.

    Ir a BigQuery

  8. Con el editor de consultas, crea una UDF llamada toMyProtoMessage que compile una columna Protobuf a partir de las columnas de la tabla de BigQuery existentes:

    CREATE FUNCTION
      DATASET_ID.toMyProtoMessage(input STRUCT<INPUT_FIELDS>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://BUCKET_NAME/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("PROTO_PACKAGE.PROTO_MESSAGE")
    return pbwrapper.parse(message, input)
      """;
    

    Reemplaza lo siguiente:

    • DATASET_ID: El ID del conjunto de datos que contendrá la UDF.
    • INPUT_FIELDS: Son los campos que se usan en el tipo de mensaje de proto para el archivo proto, en el formato field_name_1 field_type_1 [, field_name_2 field_type_2, ...].

      Debes traducir todos los campos de tipo de mensaje que usen guiones bajos para usar mayúsculas y minúsculas en su lugar. Por ejemplo, si el tipo de mensaje se ve de la siguiente manera, el valor de los campos de entrada debe ser itemId int64, itemDescription string:

      message ThisMessage {
        int64 item_id = 1;
        string item_description = 2;
      }
      
    • BUCKET_NAME: Es el nombre del bucket de Cloud Storage que contiene el archivo pbwrapper.js.

    • PROTO_PACKAGE: Es el paquete del archivo proto.

    • PROTO_MESSAGE: Es el tipo de mensaje del archivo proto.

    Por ejemplo, si usas el archivo dummy.proto proporcionado, la sentencia CREATE FUNCTION se verá de la siguiente manera:

    CREATE OR REPLACE FUNCTION
      mydataset.toMyProtoMessage(input STRUCT<dummyField STRING>)
      RETURNS BYTES
        LANGUAGE js OPTIONS ( library=["gs://mybucket/pbwrapper.js"]
    ) AS r"""
    let message = pbwrapper.setup("dummypackage.DummyMessage")
    return pbwrapper.parse(message, input)
      """;
    

Dales formato a las columnas como valores de Protobuf

Ejecuta la UDF toMyProtoMessage para dar formato a las columnas de la tabla de BigQuery como valores de Protobuf:

  SELECT
    UDF_DATASET_ID.toMyProtoMessage(STRUCT(INPUT_COLUMNS)) AS protoResult
  FROM
    `PROJECT_ID.DATASET_ID.TABLE_NAME`
  LIMIT
    100;

Reemplaza lo siguiente:

  • UDF_DATASET_ID: El ID del conjunto de datos que contiene la UDF.
  • INPUT_COLUMNS: Los nombres de las columnas que se formatearán como un valor de Protobuf, en el formato column_name_1 [, column_name_2, ...]. Las columnas pueden ser de cualquier tipo de valor escalar o no escalar compatible, incluidos ARRAY y STRUCT. Las columnas de entrada deben coincidir con el tipo y la cantidad de campos de tipo de mensaje de proto.
  • PROJECT_ID: el ID del proyecto que contiene la tabla. Puedes omitir la identificación del proyecto si el conjunto de datos está en tu proyecto actual.
  • DATASET_ID: Es el ID del conjunto de datos que contiene la tabla.
  • TABLE_NAME: Es el nombre de la tabla que contiene las columnas que se van a dar formato.

Por ejemplo, si usas una UDF toMyProtoMessage basada en dummy.proto, funciona la siguiente sentencia SELECT:

SELECT
  mydataset.toMyProtoMessage(STRUCT(word)) AS protoResult
FROM
  `bigquery-public-data.samples.shakespeare`
LIMIT 100;

Trabaja con valores de Protobuf

Con los datos de BigQuery exportados en formato Protobuf, ahora puedes trabajar con los datos como un objeto o una struct con una definición completa de tipo.

En las siguientes muestras de código, se proporcionan varios ejemplos de formas en las que puedes procesar o trabajar con los datos exportados:

Go

// package Main queries Google BigQuery.
package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"os"

	"cloud.google.com/go/bigquery"
	"google.golang.org/api/iterator"
	"google.golang.org/Protobuf/proto"

	pb "path/to/proto/file_proto"
)

const (
	projectID = "your-project-id"
)

// Row contains returned row data from bigquery.
type Row struct {
	RowKey string `bigquery:"RowKey"`
	Proto  []byte `bigquery:"ProtoResult"`
}

func main() {
	ctx := context.Background()

	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		log.Fatalf("bigquery.NewClient: %v", err)
	}
	defer client.Close()

	rows, err := query(ctx, client)
	if err != nil {
		log.Fatal(err)
	}
	if err := printResults(os.Stdout, rows); err != nil {
		log.Fatal(err)
	}
}

// query returns a row iterator suitable for reading query results.
func query(ctx context.Context, client *bigquery.Client) (*bigquery.RowIterator, error) {

	query := client.Query(
		`SELECT 
  concat(word, ":", corpus) as RowKey, 
  <dataset-id>.toMyProtoMessage(
    STRUCT(
      word, 
      CAST(word_count AS BIGNUMERIC)
    )
  ) AS ProtoResult 
FROM 
  ` + "` bigquery - public - data.samples.shakespeare `" + ` 
LIMIT 
  100;
`)
	return query.Read(ctx)
}

// printResults prints results from a query.
func printResults(w io.Writer, iter *bigquery.RowIterator) error {
	for {
		var row Row
		err := iter.Next(&row)
		if err == iterator.Done {
			return nil
		}
		if err != nil {
			return fmt.Errorf("error iterating through results: %w", err)
		}
		message := &pb.TestMessage{}
		if err = proto.Unmarshal(row.Proto, message); err != nil {
			return err
		}
		fmt.Fprintf(w, "rowKey: %s, message: %v\n", row.RowKey, message)
	}
}

Java

package proto;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.FieldValueList;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.TableResult;
import path.to.proto.TestMessage;
import java.util.UUID;

/** Queries Google BigQuery */
public final class Main {
  public static void main(String[] args) throws Exception {
    String projectId = "your-project-id";
    BigQuery bigquery = BigQueryOptions.newBuilder().setProjectId(projectId).build().getService();

    QueryJobConfiguration queryConfig =
        QueryJobConfiguration.newBuilder(
                " SELECT "
                    + "concat(word , \":\",corpus) as RowKey,"
                    + "<dataset-id>.toMyProtoMessage(STRUCT(word, "
                    + "CAST(word_count AS BIGNUMERIC))) AS ProtoResult "
                    + "FROM "
                    + "`bigquery-public-data.samples.shakespeare` "
                    + "ORDER BY word_count DESC "
                    + "LIMIT 20")
            .setUseLegacySql(false)
            .build();

    // Create a job ID so that we can safely retry.
    JobId jobId = JobId.of(UUID.randomUUID().toString());
    Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());

    // Wait for the query to complete.
    queryJob = queryJob.waitFor();

    // Check for errors
    if (queryJob == null) {
      throw new RuntimeException("Job no longer exists");
    } else if (queryJob.getStatus().getError() != null) {
      // You can also look at queryJob.getStatus().getExecutionErrors() for all
      // errors, not just the latest one.
      throw new RuntimeException(queryJob.getStatus().getError().toString());
    }

    // Get the results.
    TableResult result = queryJob.getQueryResults();

    // Print all pages of the results.
    for (FieldValueList row : result.iterateAll()) {
      String key = row.get("RowKey").getStringValue();
      byte[] message = row.get("ProtoResult").getBytesValue();
      TestMessage testMessage = TestMessage.parseFrom(message);
      System.out.printf("rowKey: %s, message: %s\n", key, testMessage);
    }
  }
}

Python

"""Queries Google BigQuery."""

from google.cloud import bigquery
from path.to.proto import awesome_pb2


def main():
  project_id = "your-project-id"
  client = bigquery.Client(project=project_id)
  query_job = client.query(query="""
               SELECT
			concat(word , ":",corpus) as RowKey,
			<dataset-id>.toMyProtoMessage(
			    STRUCT(
			      word, 
			      CAST(word_count AS BIGNUMERIC)
			    )
			  ) AS ProtoResult 
		FROM
				  `bigquery-public-data.samples.shakespeare`
		ORDER BY word_count DESC
		LIMIT 20
    """)
  rows = query_job.result()
  for row in rows:
    message = awesome_pb2.TestMessage()
    message.ParseFromString(row.get("ProtoResult"))
    print(
        "rowKey: {}, message: {}".format(row.get("RowKey"), message)
    )