Trasmettere dati in streaming utilizzando l'API Storage Write
Questo documento descrive come utilizzare l'API BigQuery Storage Write per trasmettere dati in streaming in BigQuery.
Negli scenari di streaming, i dati arrivano continuamente e devono essere disponibili per le letture con latenza minima. Quando utilizzi l'API BigQuery Storage Write per i carichi di lavoro di streaming, valuta le garanzie di cui hai bisogno:
- Se la tua applicazione ha bisogno solo della semantica almeno una volta, utilizza lo stream predefinito.
- Se hai bisogno della semantica exactly-once, crea uno o più stream di tipo committed e utilizza gli offset dello stream per garantire scritture exactly-once.
Nel tipo di commit, i dati scritti nel flusso sono disponibili per le query non appena il server riconosce la richiesta di scrittura. Il flusso predefinito utilizza anche il tipo di impegno, ma non fornisce garanzie di consegna "exactly-once".
Utilizzare il flusso predefinito per la semantica almeno una volta
Se la tua applicazione può accettare la possibilità che nella tabella di destinazione vengano visualizzati record duplicati, ti consigliamo di utilizzare lo stream predefinito per gli scenari di streaming.
Il seguente codice mostra come scrivere i dati nel flusso predefinito:
Java
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Java.
Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.
Node.js
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery.
Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.
Python
Questo esempio mostra come inserire un record con due campi utilizzando lo stream predefinito:
from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json
import sample_data_pb2
# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
"name",
"age"
]
# Function to create a batch of row data to be serialized.
def create_row_data(data):
row = sample_data_pb2.SampleData()
for field in TABLE_COLUMNS_TO_CHECK:
# Optional fields will be passed as null if not provided
if field in data:
setattr(row, field, data[field])
return row.SerializeToString()
class BigQueryStorageWriteAppend(object):
# The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
def append_rows_proto2(
project_id: str, dataset_id: str, table_id: str, data: dict
):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
stream_name = f'{parent}/_default'
write_stream = types.WriteStream()
# Create a template with fields needed for the first request.
request_template = types.AppendRowsRequest()
# The request must contain the stream name.
request_template.write_stream = stream_name
# Generating the protocol buffer representation of the message descriptor.
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
# Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
proto_rows = types.ProtoRows()
for row in data:
proto_rows.serialized_rows.append(create_row_data(row))
# Appends data to the given stream.
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
append_rows_stream.send(request)
print(f"Rows to table: '{parent}' have been written.")
if __name__ == "__main__":
###### Uncomment the below block to provide additional logging capabilities ######
#logging.basicConfig(
# level=logging.DEBUG,
# format="%(asctime)s [%(levelname)s] %(message)s",
# handlers=[
# logging.StreamHandler()
# ]
#)
###### Uncomment the above block to provide additional logging capabilities ######
with open('entries.json', 'r') as json_file:
data = json.load(json_file)
# Change this to your specific BigQuery project, dataset, table details
BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)
Questo esempio di codice dipende dal modulo del protocollo compilato sample_data_pb2.py
. Per creare il modulo compilato, esegui il comando
protoc --python_out=. sample_data.proto
, dove protoc
è il
compilatore del buffer del protocollo. Il file sample_data.proto
definisce il formato
dei messaggi utilizzati nell'esempio Python. Per installare il compilatore protoc
, segui le istruzioni riportate in Protocol Buffers - Google's data interchange format.
Di seguito sono riportati i contenuti del file sample_data.proto
:
message SampleData {
required string name = 1;
required int64 age = 2;
}
Questo script utilizza il file entries.json
, che contiene dati di esempio delle righe da inserire nella tabella BigQuery:
{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}
Utilizzare il multiplexing
Abiliti il
multiplexing
a livello di stream writer solo per lo stream predefinito. Per abilitare il multiplexing in
Java, chiama il metodo setEnableConnectionPool
quando crei un oggetto
StreamWriter
o JsonStreamWriter
.
Dopo aver abilitato il pool di connessioni, la libreria client Java gestisce le connessioni in background, aumentandone il numero se quelle esistenti sono considerate troppo occupate. Per rendere più efficace lo scale up automatico, ti consigliamo di ridurre il limite di maxInflightRequests
.
// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build();
Per attivare il multiplexing in Go, vedi Condivisione della connessione (multiplexing).
Utilizza il tipo committed per la semantica exactly-once
Se hai bisogno di una semantica di scrittura esattamente una volta, crea un flusso di scrittura di tipo committed. Nel tipo Committed, i record sono disponibili per la query non appena il client riceve la conferma dal backend.
Il tipo Committed fornisce la consegna "exactly-once" all'interno di uno stream tramite l'utilizzo di offset dei record. Utilizzando gli offset dei record, l'applicazione specifica l'offset di accodamento successivo in ogni chiamata a AppendRows
. L'operazione di scrittura viene eseguita
solo se il valore di offset corrisponde all'offset di accodamento successivo. Per saperne di più, consulta Gestire gli offset del flusso per ottenere la semantica exactly-once.
Se non fornisci un offset, i record vengono aggiunti alla fine corrente dello stream. In questo caso, se una richiesta di aggiunta restituisce un errore, il nuovo tentativo potrebbe comportare la visualizzazione del record più di una volta nel flusso.
Per utilizzare il tipo di impegno, segui questi passaggi:
Java
- Chiama
CreateWriteStream
per creare uno o più stream di tipo committed. - Per ogni stream, chiama
AppendRows
in un ciclo per scrivere batch di record. - Chiama
FinalizeWriteStream
per ogni stream per rilasciarlo. Dopo aver chiamato questo metodo, non puoi scrivere altre righe nel flusso. Questo passaggio è facoltativo per il tipo di impegno, ma contribuisce a evitare di superare il limite di stream attivi. Per ulteriori informazioni, vedi Limitare la velocità di creazione di stream.
Node.js
- Chiama
createWriteStreamFullResponse
per creare uno o più stream di tipo committed. - Per ogni stream, chiama
appendRows
in un ciclo per scrivere batch di record. - Chiama
finalize
per ogni stream per rilasciarlo. Dopo aver chiamato questo metodo, non puoi scrivere altre righe nel flusso. Questo passaggio è facoltativo per il tipo di impegno, ma contribuisce a evitare di superare il limite di stream attivi. Per ulteriori informazioni, vedi Limitare la velocità di creazione di stream.
Non puoi eliminare uno stream in modo esplicito. Gli stream seguono la durata (TTL) definita dal sistema:
- Un flusso di dati di cui è stato eseguito il commit ha un TTL di tre giorni se non c'è traffico sul flusso.
- Per impostazione predefinita, un flusso memorizzato nel buffer ha un TTL di sette giorni se non c'è traffico sul flusso.
Il seguente codice mostra come utilizzare il tipo committed:
Java
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Java.
Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.
Node.js
Per scoprire come installare e utilizzare la libreria client per BigQuery, consulta Librerie client di BigQuery.
Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.
Utilizzare il formato Apache Arrow per importare i dati
Il seguente codice mostra come importare i dati utilizzando il formato Apache Arrow. Per un esempio end-to-end più dettagliato, consulta l'esempio PyArrow su GitHub.
Python
Questo esempio mostra come importare una tabella PyArrow serializzata utilizzando lo stream predefinito.
from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1
def append_rows_with_pyarrow(
pyarrow_table: pyarrow.Table,
project_id: str,
dataset_id: str,
table_id: str,
):
bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()
# Create request_template.
request_template = gapic_types.AppendRowsRequest()
request_template.write_stream = (
f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
)
arrow_data = gapic_types.AppendRowsRequest.ArrowData()
arrow_data.writer_schema.serialized_schema = (
pyarrow_table.schema.serialize().to_pybytes()
)
request_template.arrow_rows = arrow_data
# Create AppendRowsStream.
append_rows_stream = AppendRowsStream(
bqstorage_write_client,
request_template,
)
# Create request with table data.
request = gapic_types.AppendRowsRequest()
request.arrow_rows.rows.serialized_record_batch = (
pyarrow_table.to_batches()[0].serialize().to_pybytes()
)
# Send request.
future = append_rows_stream.send(request)
# Wait for result.
future.result()