Transmitir datos con la API Storage Write
En este documento se describe cómo usar la API Storage Write de BigQuery para transmitir datos a BigQuery.
En los casos prácticos de streaming, los datos llegan de forma continua y deben estar disponibles para lecturas con una latencia mínima. Cuando uses la API Storage Write de BigQuery para cargas de trabajo de streaming, ten en cuenta las garantías que necesitas:
- Si tu aplicación solo necesita la semántica de al menos una vez, usa el stream predeterminado.
- Si necesitas una semántica de procesamiento "una sola vez", crea uno o varios flujos de tipo confirmado y usa los desplazamientos de flujo para garantizar que las escrituras se realicen una sola vez.
En el tipo confirmado, los datos escritos en el flujo están disponibles para las consultas en cuanto el servidor confirma la solicitud de escritura. El flujo predeterminado también usa el tipo confirmado, pero no ofrece garantías de entrega única.
Usar el flujo predeterminado para la semántica de al menos una vez
Si tu aplicación puede aceptar la posibilidad de que aparezcan registros duplicados en la tabla de destino, te recomendamos que utilices el stream predeterminado en los casos de streaming.
En el siguiente código se muestra cómo escribir datos en el flujo predeterminado:
Java
Para saber cómo instalar y usar la biblioteca de cliente de BigQuery, consulta Bibliotecas de cliente de BigQuery. Para obtener más información, consulta la documentación de referencia de la API Java de BigQuery.
Para autenticarte en BigQuery, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación para bibliotecas de cliente.
Node.js
Para saber cómo instalar y usar la biblioteca de cliente de BigQuery, consulta Bibliotecas de cliente de BigQuery.
Para autenticarte en BigQuery, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación para bibliotecas de cliente.
Python
En este ejemplo se muestra cómo insertar un registro con dos campos mediante el flujo predeterminado:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
Este ejemplo de código depende del módulo de protocolo compilado sample_data_pb2.py
. Para crear el módulo compilado, ejecuta el comando protoc --python_out=. sample_data.proto
, donde protoc
es el compilador de búfer de protocolo. El archivo sample_data.proto
define el formato de los mensajes utilizados en el ejemplo de Python. Para instalar el compilador protoc
, sigue las instrucciones que se indican en Búferes de protocolo: formato de intercambio de datos de Google.
Este es el contenido del archivo sample_data.proto
:
message SampleData {
required string name = 1;
required int64 age = 2;
}
Esta secuencia de comandos usa el archivo entries.json
, que contiene datos de filas de ejemplo que se insertarán en la tabla de BigQuery:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
Usar la multiplexación
Solo puedes habilitar el multiplexado a nivel de escritor de flujo para el flujo predeterminado. Para habilitar el multiplexado en Java, llama al método setEnableConnectionPool
cuando crees un objeto StreamWriter
o JsonStreamWriter
.
Después de habilitar el grupo de conexiones, la biblioteca de cliente de Java gestiona tus conexiones en segundo plano y aumenta el número de conexiones si las existentes están demasiado ocupadas. Para que el escalado automático sea más eficaz, te recomendamos que reduzcas el maxInflightRequests
límite.
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build();
Para habilitar la multiplexación en Go, consulta Compartir conexión (multiplexación).
Usa el tipo confirmado para la semántica de entrega única
Si necesitas una semántica de escritura exactamente una vez, crea un flujo de escritura de tipo confirmado. En el tipo confirmado, los registros están disponibles para las consultas en cuanto el cliente recibe la confirmación del backend.
El tipo Confirmado proporciona una entrega única en una secuencia mediante el uso de desplazamientos de registros. Al usar los desplazamientos de registro, la aplicación especifica el siguiente desplazamiento de anexión en cada llamada a AppendRows
. La operación de escritura solo se realiza si el valor de desplazamiento coincide con el siguiente desplazamiento de anexión. Para obtener más información, consulta Gestionar los desplazamientos de la secuencia para conseguir una semántica de una sola vez.
Si no proporcionas un desplazamiento, los registros se añadirán al final de la secuencia. En ese caso, si una solicitud de anexión devuelve un error, volver a intentarlo podría provocar que el registro aparezca más de una vez en el flujo.
Para usar el tipo confirmado, sigue estos pasos:
Java
- Llama a
CreateWriteStream
para crear uno o varios flujos de tipo comprometido. - En cada flujo, llama a
AppendRows
en un bucle para escribir lotes de registros. - Llama a
FinalizeWriteStream
para cada flujo para liberar el flujo. Después de llamar a este método, no podrás escribir más filas en el flujo. Este paso es opcional en el tipo de compromiso, pero ayuda a evitar que se supere el límite de las emisiones activas. Para obtener más información, consulta Limitar la frecuencia de creación de streams.
Node.js
- Llama a
createWriteStreamFullResponse
para crear uno o varios flujos de tipo comprometido. - En cada flujo, llama a
appendRows
en un bucle para escribir lotes de registros. - Llama a
finalize
para cada flujo para liberar el flujo. Después de llamar a este método, no podrás escribir más filas en el flujo. Este paso es opcional en el tipo de compromiso, pero ayuda a evitar que se supere el límite de las emisiones activas. Para obtener más información, consulta Limitar la frecuencia de creación de streams.
No puedes eliminar una novedad de forma explícita. Los flujos siguen el tiempo de vida (TTL) definido por el sistema:
- Una secuencia confirmada tiene un TTL de tres días si no hay tráfico en la secuencia.
- De forma predeterminada, un flujo almacenado en búfer tiene un TTL de siete días si no hay tráfico en el flujo.
En el siguiente código se muestra cómo usar el tipo confirmado:
Java
Para saber cómo instalar y usar la biblioteca de cliente de BigQuery, consulta Bibliotecas de cliente de BigQuery. Para obtener más información, consulta la documentación de referencia de la API Java de BigQuery.
Para autenticarte en BigQuery, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación para bibliotecas de cliente.
Node.js
Para saber cómo instalar y usar la biblioteca de cliente de BigQuery, consulta Bibliotecas de cliente de BigQuery.
Para autenticarte en BigQuery, configura las credenciales predeterminadas de la aplicación. Para obtener más información, consulta el artículo Configurar la autenticación para bibliotecas de cliente.
Usar el formato Apache Arrow para ingerir datos
El siguiente código muestra cómo ingerir datos con el formato Apache Arrow. Para ver un ejemplo completo más detallado, consulta el ejemplo de PyArrow en GitHub.
Python
En este ejemplo se muestra cómo ingerir una tabla de PyArrow serializada mediante el flujo predeterminado.
from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1
def append_rows_with_pyarrow(
pyarrow_table: pyarrow.Table,
project_id: str,
dataset_id: str,
table_id: str,
):
bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()
# Create request_template.
request_template = gapic_types.AppendRowsRequest()
request_template.write_stream = (
f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
)
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
arrow_data.writer_schema.serialized_schema = (
pyarrow_table.schema.serialize().to_pybytes()
)
request_template.arrow_rows = arrow_data
# Create AppendRowsStream.
append_rows_stream = AppendRowsStream(
bqstorage_write_client,
request_template,
)
# Create request with table data.
request = gapic_types.AppendRowsRequest()
request.arrow_rows.rows.serialized_record_batch = (
pyarrow_table.to_batches()[0].serialize().to_pybytes()
)
# Send request.
future = append_rows_stream.send(request)
# Wait for result.
future.result()