Transmite datos con la API de Storage Write
En este documento, se describe cómo usar la API de BigQuery Storage Write para transmitir datos a BigQuery.
En situaciones de transmisión, los datos llegan de forma continua y deberían estar disponibles para las lecturas con una latencia mínima. Cuando uses la API de BigQuery Storage Write para las cargas de trabajo de transmisión, considera qué garantías necesitas:
- Si tu aplicación solo necesita la semántica de al menos una vez, usa la transmisión predeterminada.
- Si necesitas una semántica de tipo "exactamente una vez", crea una o más transmisiones en el tipo de confirmación y usa desplazamientos de transmisión para garantizar escrituras de tipo "exactamente una vez".
En el tipo de confirmación, los datos escritos en la transmisión están disponibles para consultas en cuanto el servidor confirma la solicitud de escritura. La transmisión predeterminada también usa el tipo de confirmación, pero no proporciona garantías del tipo "exactamente una vez".
Usa la transmisión predeterminada para obtener una semántica de al menos una vez
Si tu aplicación puede aceptar la posibilidad de que aparezcan registros duplicados en la tabla de destino, recomendamos usar la transmisión predeterminada para situaciones de transmisión.
En el siguiente código, se muestra cómo escribir datos en la transmisión predeterminada:
Java
Para aprender a instalar y usar la biblioteca cliente de BigQuery, consulta las bibliotecas cliente de BigQuery. Para obtener más información, consulta la documentación de referencia de la API de BigQuery para Java.
Para autenticarte en BigQuery, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para bibliotecas cliente.
Node.js
Para aprender a instalar y usar la biblioteca cliente de BigQuery, consulta las bibliotecas cliente de BigQuery.
Para autenticarte en BigQuery, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para bibliotecas cliente.
Python
En este ejemplo, se muestra cómo insertar un registro con dos campos usando el flujo predeterminado:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
Este ejemplo de código depende del módulo de protocolo compilado sample_data_pb2.py
. Para crear el módulo compilado, ejecuta el comando protoc --python_out=. sample_data.proto
, en el que protoc
es el compilador del búfer de protocolo. El archivo sample_data.proto
define el formato de los mensajes que se usan en el ejemplo de Python. Para instalar el compilador protoc
, sigue las instrucciones que se indican en Protocol Buffers: El formato de intercambio de datos de Google.
Este es el contenido del archivo sample_data.proto
:
message SampleData {
required string name = 1;
required int64 age = 2;
}
Esta secuencia de comandos consume el archivo entities.json
, que contiene datos de filas de muestra para insertarse en la tabla de BigQuery:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
Usa multiplexación
Habilita la multiplexación en el nivel de escritor de transmisión solo para la transmisión predeterminada. Para habilitar la multiplexación en Java, llama al método setEnableConnectionPool
cuando construyas un objeto JsonStreamWriter
o StreamWriter
:
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .build();
Para habilitar la multiplexación en Go, consulta Uso compartido de la conexión (multiplexación).
Usa el modo de confirmación para la semántica del tipo "exactamente una vez"
Si necesitas una semántica de escritura del tipo "exactamente una vez", crea una transmisión de escritura en tipo de confirmación. En el tipo de confirmación, los registros están disponibles para consultas tan pronto como el cliente recibe la confirmación desde el backend.
El tipo de confirmación proporciona una entrega de tipo "exactamente única" dentro de una transmisión mediante el uso de compensaciones de registro. Mediante el uso de compensación de registro, la aplicación especifica la siguiente compensación de anexo en cada llamada a AppendRows
. La operación de escritura solo se realiza si el valor de compensación coincide con la siguiente compensación de anexo. Si quieres obtener más información, consulta Administra compensaciones de transmisión para lograr una semántica del tipo "exactamente una vez".
Si no proporcionas una compensación, los registros se agregan al final actual de la transmisión. En ese caso, si una solicitud anexada muestra un error, volver a intentar la operación podría hacer que el registro aparezca más de una vez en la transmisión.
Para usar el tipo de confirmación, sigue los siguientes pasos:
Java
- Llama a
CreateWriteStream
para crear una o más transmisiones en del tipo de confirmación. - Para cada transmisión, llama a
AppendRows
en un bucle a fin de escribir lotes de registros. - Llama a
FinalizeWriteStream
para cada transmisión a fin de liberarla. Después de llamar a este método, no puedes escribir más filas en la transmisión. Este paso es opcional en el tipo de confirmación, pero ayuda a evitar que se exceda el límite de las transmisiones activas. Para obtener más información, consulta Limita la velocidad de creación de transmisiones.
Node.js
- Llama a
createWriteStreamFullResponse
para crear una o más transmisiones en del tipo de confirmación. - Para cada transmisión, llama a
appendRows
en un bucle a fin de escribir lotes de registros. - Llama a
finalize
para cada transmisión a fin de liberarla. Después de llamar a este método, no puedes escribir más filas en la transmisión. Este paso es opcional en el tipo de confirmación, pero ayuda a evitar que se exceda el límite de las transmisiones activas. Para obtener más información, consulta Limita la velocidad de creación de transmisiones.
No puedes borrar una transmisión de forma explícita. Las transmisiones siguen el tiempo de actividad (TTL) definido por el sistema:
- Una transmisión confirmada tiene un TTL de tres días si no hay tráfico en la transmisión.
- De forma predeterminada, una transmisión almacenada en búfer tiene un TTL de siete días si no hay tráfico en la transmisión.
En el siguiente código, se muestra cómo usar el tipo de confirmación.
Java
Para aprender a instalar y usar la biblioteca cliente de BigQuery, consulta las bibliotecas cliente de BigQuery. Para obtener más información, consulta la documentación de referencia de la API de BigQuery para Java.
Para autenticarte en BigQuery, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para bibliotecas cliente.
Node.js
Para aprender a instalar y usar la biblioteca cliente de BigQuery, consulta las bibliotecas cliente de BigQuery.
Para autenticarte en BigQuery, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para bibliotecas cliente.