Utilizzare l'API Streaming precedente

Questo documento descrive come trasmettere in streaming i dati in BigQuery utilizzando il metodo legacy tabledata.insertAll.

Per i nuovi progetti, consigliamo di utilizzare l'API BigQuery Storage Write anziché il metodo tabledata.insertAll. L'API Storage Write offre prezzi inferiori e funzionalità più solide, tra cui la semantica di consegna "exactly-once". Se stai eseguendo la migrazione di un progetto esistente dal metodo tabledata.insertAll all'API Storage Write, ti consigliamo di selezionare lo stream predefinito. Il metodo tabledata.insertAll è ancora completamente supportato.

Prima di iniziare

  1. Assicurati di disporre dell'accesso in scrittura al set di dati che contiene la tabella di destinazione. La tabella deve esistere prima di iniziare a scrivervi i dati a meno che tu non stia utilizzando tabelle modello. Per saperne di più sulle tabelle modello, vedi Creare tabelle automaticamente utilizzando le tabelle modello.

  2. Consulta le norme relative alle quote per i dati di streaming.

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Lo streaming non è disponibile tramite il livello gratuito. Se provi a utilizzare lo streaming senza attivare la fatturazione, viene visualizzato il seguente errore: BigQuery: Streaming insert is not allowed in the free tier.

  5. Concedi ruoli IAM (Identity and Access Management) che forniscono agli utenti le autorizzazioni necessarie per eseguire ogni attività descritta in questo documento.

Autorizzazioni obbligatorie

Per trasmettere dati in streaming in BigQuery, devi disporre delle seguenti autorizzazioni IAM:

  • bigquery.tables.updateData (consente di inserire dati nella tabella)
  • bigquery.tables.get (consente di ottenere i metadati della tabella)
  • bigquery.datasets.get (consente di ottenere i metadati del set di dati)
  • bigquery.tables.create (obbligatorio se utilizzi una tabella modello per creare automaticamente la tabella)

Ciascuno dei seguenti ruoli IAM predefiniti include le autorizzazioni necessarie per trasmettere dati in streaming a BigQuery:

  • roles/bigquery.dataEditor
  • roles/bigquery.dataOwner
  • roles/bigquery.admin

Per saperne di più sui ruoli e sulle autorizzazioni IAM in BigQuery, consulta Ruoli e autorizzazioni predefiniti.

Trasmettere flussi di dati a BigQuery

C#

Prima di provare questo esempio, segui le istruzioni di configurazione di C# nella guida rapida di BigQuery per l'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery C#.

Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.


using Google.Cloud.BigQuery.V2;

public class BigQueryTableInsertRows
{
    public void TableInsertRows(
        string projectId = "your-project-id",
        string datasetId = "your_dataset_id",
        string tableId = "your_table_id"
    )
    {
        BigQueryClient client = BigQueryClient.Create(projectId);
        BigQueryInsertRow[] rows = new BigQueryInsertRow[]
        {
            // The insert ID is optional, but can avoid duplicate data
            // when retrying inserts.
            new BigQueryInsertRow(insertId: "row1") {
                { "name", "Washington" },
                { "post_abbr", "WA" }
            },
            new BigQueryInsertRow(insertId: "row2") {
                { "name", "Colorado" },
                { "post_abbr", "CO" }
            }
        };
        client.InsertRows(datasetId, tableId, rows);
    }
}

Go

Prima di provare questo esempio, segui le istruzioni di configurazione di Go nella guida rapida di BigQuery per l'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Go.

Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// Item represents a row item.
type Item struct {
	Name string
	Age  int
}

// Save implements the ValueSaver interface.
// This example disables best-effort de-duplication, which allows for higher throughput.
func (i *Item) Save() (map[string]bigquery.Value, string, error) {
	return map[string]bigquery.Value{
		"full_name": i.Name,
		"age":       i.Age,
	}, bigquery.NoDedupeID, nil
}

// insertRows demonstrates inserting data into a table using the streaming insert mechanism.
func insertRows(projectID, datasetID, tableID string) error {
	// projectID := "my-project-id"
	// datasetID := "mydataset"
	// tableID := "mytable"
	ctx := context.Background()
	client, err := bigquery.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("bigquery.NewClient: %w", err)
	}
	defer client.Close()

	inserter := client.Dataset(datasetID).Table(tableID).Inserter()
	items := []*Item{
		// Item implements the ValueSaver interface.
		{Name: "Phred Phlyntstone", Age: 32},
		{Name: "Wylma Phlyntstone", Age: 29},
	}
	if err := inserter.Put(ctx, items); err != nil {
		return err
	}
	return nil
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida di BigQuery per l'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Java.

Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to inserting rows into a table without running a load job.
public class TableInsertRows {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    // Create a row to insert
    Map<String, Object> rowContent = new HashMap<>();
    rowContent.put("booleanField", true);
    rowContent.put("numericField", "3.14");
    // TODO(developer): Replace the row id with a unique value for each row.
    String rowId = "ROW_ID";
    tableInsertRows(datasetName, tableName, rowId, rowContent);
  }

  public static void tableInsertRows(
      String datasetName, String tableName, String rowId, Map<String, Object> rowContent) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();

      // Get table
      TableId tableId = TableId.of(datasetName, tableName);

      // Inserts rowContent into datasetName:tableId.
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(tableId)
                  // More rows can be added in the same RPC by invoking .addRow() on the builder.
                  // You can omit the unique row ids to disable de-duplication.
                  .addRow(rowId, rowContent)
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js nella guida rapida di BigQuery per l'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Node.js.

Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.

// Import the Google Cloud client library
const {BigQuery} = require('@google-cloud/bigquery');
const bigquery = new BigQuery();

async function insertRowsAsStream() {
  // Inserts the JSON objects into my_dataset:my_table.

  /**
   * TODO(developer): Uncomment the following lines before running the sample.
   */
  // const datasetId = 'my_dataset';
  // const tableId = 'my_table';
  const rows = [
    {name: 'Tom', age: 30},
    {name: 'Jane', age: 32},
  ];

  // Insert data into a table
  await bigquery.dataset(datasetId).table(tableId).insert(rows);
  console.log(`Inserted ${rows.length} rows`);
}

PHP

Prima di provare questo esempio, segui le istruzioni di configurazione di PHP nella guida rapida di BigQuery per l'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery PHP.

Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.

use Google\Cloud\BigQuery\BigQueryClient;

/**
 * Stream data into bigquery
 *
 * @param string $projectId The project Id of your Google Cloud Project.
 * @param string $datasetId The BigQuery dataset ID.
 * @param string $tableId The BigQuery table ID.
 * @param string $data Json encoded data For eg,
 *    $data = json_encode([
 *       "field1" => "value1",
 *       "field2" => "value2",
 *    ]);
 */
function stream_row(
    string $projectId,
    string $datasetId,
    string $tableId,
    string $data
): void {
    // instantiate the bigquery table service
    $bigQuery = new BigQueryClient([
      'projectId' => $projectId,
    ]);
    $dataset = $bigQuery->dataset($datasetId);
    $table = $dataset->table($tableId);

    $data = json_decode($data, true);
    $insertResponse = $table->insertRows([
      ['data' => $data],
      // additional rows can go here
    ]);
    if ($insertResponse->isSuccessful()) {
        print('Data streamed into BigQuery successfully' . PHP_EOL);
    } else {
        foreach ($insertResponse->failedRows() as $row) {
            foreach ($row['errors'] as $error) {
                printf('%s: %s' . PHP_EOL, $error['reason'], $error['message']);
            }
        }
    }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida di BigQuery per l'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Python.

Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby nella guida rapida di BigQuery per l'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Ruby.

Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.

require "google/cloud/bigquery"

def table_insert_rows dataset_id = "your_dataset_id", table_id = "your_table_id"
  bigquery = Google::Cloud::Bigquery.new
  dataset  = bigquery.dataset dataset_id
  table    = dataset.table table_id

  row_data = [
    { name: "Alice", value: 5  },
    { name: "Bob",   value: 10 }
  ]
  response = table.insert row_data

  if response.success?
    puts "Inserted rows successfully"
  else
    puts "Failed to insert #{response.error_rows.count} rows"
  end
end

Non è necessario compilare il campo insertID quando inserisci le righe. L'esempio seguente mostra come evitare di inviare un insertID per ogni riga durante lo streaming.

Java

Prima di provare questo esempio, segui le istruzioni di configurazione di Java nella guida rapida di BigQuery per l'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Java.

Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryError;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.InsertAllRequest;
import com.google.cloud.bigquery.InsertAllResponse;
import com.google.cloud.bigquery.TableId;
import com.google.common.collect.ImmutableList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

// Sample to insert rows without row ids in a table
public class TableInsertRowsWithoutRowIds {

  public static void main(String[] args) {
    // TODO(developer): Replace these variables before running the sample.
    String datasetName = "MY_DATASET_NAME";
    String tableName = "MY_TABLE_NAME";
    tableInsertRowsWithoutRowIds(datasetName, tableName);
  }

  public static void tableInsertRowsWithoutRowIds(String datasetName, String tableName) {
    try {
      // Initialize client that will be used to send requests. This client only needs to be created
      // once, and can be reused for multiple requests.
      final BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
      // Create rows to insert
      Map<String, Object> rowContent1 = new HashMap<>();
      rowContent1.put("stringField", "Phred Phlyntstone");
      rowContent1.put("numericField", 32);
      Map<String, Object> rowContent2 = new HashMap<>();
      rowContent2.put("stringField", "Wylma Phlyntstone");
      rowContent2.put("numericField", 29);
      InsertAllResponse response =
          bigquery.insertAll(
              InsertAllRequest.newBuilder(TableId.of(datasetName, tableName))
                  // No row ids disable de-duplication, and also disable the retries in the Java
                  // library.
                  .setRows(
                      ImmutableList.of(
                          InsertAllRequest.RowToInsert.of(rowContent1),
                          InsertAllRequest.RowToInsert.of(rowContent2)))
                  .build());

      if (response.hasErrors()) {
        // If any of the insertions failed, this lets you inspect the errors
        for (Map.Entry<Long, List<BigQueryError>> entry : response.getInsertErrors().entrySet()) {
          System.out.println("Response error: \n" + entry.getValue());
        }
      }
      System.out.println("Rows successfully inserted into table without row ids");
    } catch (BigQueryException e) {
      System.out.println("Insert operation not performed \n" + e.toString());
    }
  }
}

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python nella guida rapida di BigQuery per l'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API BigQuery Python.

Per eseguire l'autenticazione in BigQuery, configura le Credenziali predefinite dell'applicazione. Per saperne di più, vedi Configurare l'autenticazione per le librerie client.

from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

# TODO(developer): Set table_id to the ID of table to append to.
# table_id = "your-project.your_dataset.your_table"

rows_to_insert = [
    {"full_name": "Phred Phlyntstone", "age": 32},
    {"full_name": "Wylma Phlyntstone", "age": 29},
]

errors = client.insert_rows_json(
    table_id, rows_to_insert, row_ids=[None] * len(rows_to_insert)
)  # Make an API request.
if errors == []:
    print("New rows have been added.")
else:
    print("Encountered errors while inserting rows: {}".format(errors))

Inviare dati di data e ora

Per i campi data e ora, formatta i dati nel metodo tabledata.insertAll come segue:

Tipo Formato
DATE Una stringa nel formato "YYYY-MM-DD"
DATETIME Una stringa nel formato "YYYY-MM-DD [HH:MM:SS]"
TIME Una stringa nel formato "HH:MM:SS"
TIMESTAMP Il numero di secondi dal 1° gennaio 1970 (l'epoca Unix) o una stringa nel formato "YYYY-MM-DD HH:MM[:SS]"

Inviare i dati dell'intervallo

Per i campi di tipo RANGE<T>, formatta i dati nel metodo tabledata.insertAll come oggetto JSON con due campi, start e end. I valori mancanti o NULL per i campi start e end rappresentano limiti illimitati. Questi campi devono avere lo stesso formato JSON supportato di tipo T, dove T può essere uno tra DATE, DATETIME e TIMESTAMP.

Nell'esempio seguente, il campo f_range_date rappresenta una colonna RANGE<DATE> in una tabella. In questa colonna viene inserita una riga utilizzando l'API tabledata.insertAll.

{
    "f_range_date": {
        "start": "1970-01-02",
        "end": null
    }
}

Disponibilità dei dati di streaming

I dati sono disponibili per l'analisi in tempo reale utilizzando le query GoogleSQL immediatamente dopo che BigQuery ha confermato correttamente una richiesta tabledata.insertAll.

Le righe trasmesse in streaming di recente a una tabella partizionata in base al tempo di importazione hanno temporaneamente un valore NULL per la pseudocolonna _PARTITIONTIME. Per queste righe, BigQuery assegna in background il valore finale non NULL della colonna PARTITIONTIME, in genere entro pochi minuti. In rari casi, l'operazione può richiedere fino a 90 minuti.

Alcune righe trasmesse in streaming di recente potrebbero non essere disponibili per la copia della tabella in genere per alcuni minuti. In rari casi, l'operazione può richiedere fino a 90 minuti. Per verificare se i dati sono disponibili per la copia della tabella, controlla la risposta tables.get per una sezione denominata streamingBuffer. Se la sezione streamingBuffer non è presente, i tuoi dati sono disponibili per la copia. Puoi anche utilizzare il campo streamingBuffer.oldestEntryTime per identificare l'età dei record nel buffer di streaming.

Deduplicazione best effort

Quando fornisci insertId per una riga inserita, BigQuery utilizza questo ID per supportare la deduplicazione con il massimo impegno per un massimo di un minuto. ovvero, se trasmetti in streaming la stessa riga con lo stesso insertId più di una volta nello stesso periodo di tempo nella stessa tabella, BigQuery potrebbe deduplicare le più occorrenze di quella riga, conservandone solo una.

Il sistema prevede che le righe fornite con insertId identici siano identiche. Se due righe hanno insertId identici, non è deterministico quale riga BigQuery conserva.

La deduplicazione è generalmente pensata per gli scenari di ripetizione in un sistema distribuito in cui non è possibile determinare lo stato di un inserimento di flussi di dati in determinate condizioni di errore, ad esempio errori di rete tra il sistema e BigQuery o errori interni a BigQuery. Se riprovi un inserimento, utilizza lo stesso insertId per lo stesso insieme di righe in modo che BigQuery possa tentare di deduplicare i dati. Per ulteriori informazioni, consulta la sezione Risoluzione dei problemi relativi agli inserimenti di flussi di dati.

La deduplicazione offerta da BigQuery è il risultato di un impegno massimo e non deve essere considerata un meccanismo per garantire l'assenza di duplicati nei tuoi dati. Inoltre, BigQuery potrebbe ridurre la qualità della deduplicazione ottimale in qualsiasi momento per garantire una maggiore affidabilità e disponibilità dei tuoi dati.

Se hai requisiti di deduplicazione rigorosi per i tuoi dati, Google Cloud Datastore è un servizio alternativo che supporta le transazioni.

Disattivare la deduplicazione best effort

Puoi disattivare la deduplicazione best effort non compilando il campo insertId per ogni riga inserita. Questo è il modo consigliato per inserire i dati.

Apache Beam e Dataflow

Per disattivare la deduplicazione con il massimo impegno quando utilizzi il connettore BigQuery I/O di Apache Beam per Java, utilizza il metodo ignoreInsertIds().

Rimozione manuale dei duplicati

Per assicurarti che non esistano righe duplicate al termine dello streaming, utilizza la seguente procedura manuale:

  1. Aggiungi insertId come colonna nello schema della tabella e includi il valore insertId nei dati per ogni riga.
  2. Dopo l'interruzione dello streaming, esegui la seguente query per verificare la presenza di duplicati:

    #standardSQL
    SELECT
      MAX(count) FROM(
      SELECT
        ID_COLUMN,
        count(*) as count
      FROM
        `TABLE_NAME`
      GROUP BY
        ID_COLUMN)

    Se il risultato è maggiore di 1, esistono duplicati.
  3. Per rimuovere i duplicati, esegui la seguente query. Specifica una tabella di destinazione, consenti risultati di grandi dimensioni e disattiva l'appiattimento dei risultati.

    #standardSQL
    SELECT
      * EXCEPT(row_number)
    FROM (
      SELECT
        *,
        ROW_NUMBER()
              OVER (PARTITION BY ID_COLUMN) row_number
      FROM
        `TABLE_NAME`)
    WHERE
      row_number = 1

Note sulla query di rimozione dei duplicati:

  • La strategia più sicura per la query di rimozione dei duplicati è quella di scegliere come target una nuova tabella. In alternativa, puoi scegliere come target la tabella di origine con la disposizione di scrittura WRITE_TRUNCATE.
  • La query di rimozione dei duplicati aggiunge una colonna row_number con il valore 1 alla fine dello schema della tabella. La query utilizza un'istruzione SELECT * EXCEPT di GoogleSQL per escludere la colonna row_number dalla tabella di destinazione. Il prefisso #standardSQL attiva GoogleSQL per questa query. In alternativa, puoi selezionare i nomi di colonne specifici per omettere questa colonna.
  • Per eseguire query sui dati in tempo reale con i duplicati rimossi, puoi anche creare una vista sulla tabella utilizzando la query di rimozione dei duplicati. Tieni presente che i costi delle query sulla vista vengono calcolati in base alle colonne selezionate nella vista, il che può comportare dimensioni di byte scansionati elevate.

Eseguire lo streaming in tabelle partizionate in base al tempo

Quando trasmetti dati in streaming a una tabella partizionata in base al tempo, ogni partizione ha un buffer di streaming. Il buffer di streaming viene mantenuto quando esegui un job di caricamento, query o copia che sovrascrive una partizione impostando la proprietà writeDisposition su WRITE_TRUNCATE. Se vuoi rimuovere il buffer di streaming, verifica che sia vuoto chiamando tables.get sulla partizione.

Partizionamento per data di importazione

Quando esegui lo streaming in una tabella partizionata per data di importazione, BigQuery deduce la partizione di destinazione dall'ora UTC corrente.

I dati appena arrivati vengono temporaneamente inseriti nella partizione __UNPARTITIONED__ mentre si trovano nel buffer dei flussi di dati. Quando ci sono dati non partizionati sufficienti, BigQuery li partiziona nella partizione corretta. Tuttavia, non esiste un SLA per il tempo necessario per spostare i dati dalla partizione __UNPARTITIONED__. Una query può escludere i dati nel buffer di streaming da una query filtrando i valori NULL dalla partizione __UNPARTITIONED__ utilizzando una delle pseudocolonne (_PARTITIONTIME o _PARTITIONDATE a seconda del tipo di dati preferito).

Se trasmetti dati in streaming in una tabella partizionata giornalmente, puoi ignorare l'inferenza della data fornendo un decoratore di partizione come parte della richiesta insertAll. Includi il decoratore nel parametro tableId. Ad esempio, puoi trasmettere in streaming alla partizione corrispondente a 2021-03-01 per la tabella table1 utilizzando il decoratore di partizione:

table1$20210301

Quando esegui lo streaming utilizzando un decoratore di partizioni, puoi eseguire lo streaming nelle partizioni entro gli ultimi 31 giorni nel passato e 16 giorni nel futuro rispetto alla data corrente, in base all'ora UTC attuale. Per scrivere nelle partizioni per le date al di fuori di questi limiti consentiti, utilizza un job di caricamento o di query, come descritto in Aggiunta e sovrascrittura dei dati delle tabella partizionata partizionate.

Lo streaming che utilizza un decoratore di partizione è supportato solo per le tabelle partizionate giornalmente. Non è supportata per le tabelle partizionate orarie, mensili o annuali.

Per i test, puoi utilizzare lo strumento a riga di comando bq bq insert. Ad esempio, il seguente comando trasmette in streaming una singola riga a una partizione per la data 1° gennaio 2017 ($20170101) in una tabella partizionata denominata mydataset.mytable:

echo '{"a":1, "b":2}' | bq insert 'mydataset.mytable$20170101'

Partizionamento delle colonne di unità di tempo

Puoi trasmettere dati in streaming in una tabella partizionata in base a una colonna DATE, DATETIME o TIMESTAMP compresa tra 10 anni nel passato e 1 anno nel futuro. I dati al di fuori di questo intervallo vengono rifiutati.

Quando i dati vengono trasmessi in streaming, vengono inizialmente inseriti nella partizione __UNPARTITIONED__. Quando ci sono dati non partizionati sufficienti, BigQuery ripartiziona automaticamente i dati, inserendoli nella partizione appropriata. Tuttavia, non esiste un SLA per il tempo necessario per spostare i dati dalla partizione __UNPARTITIONED__.

  • Nota: le partizioni giornaliere vengono elaborate in modo diverso rispetto a quelle orarie, mensili e annuali. Solo i dati al di fuori dell'intervallo di date (dagli ultimi 7 giorni ai 3 giorni futuri) vengono estratti nella partizione UNPARTITIONED, in attesa di essere ripartizionati. D'altra parte, per la tabella partizionata oraria, i dati vengono sempre estratti nella partizione UNPARTITIONED e ripartizionati in un secondo momento.

Creare tabelle automaticamente utilizzando le tabelle modello

Le tabelle modello forniscono un meccanismo per dividere una tabella logica in molte tabelle più piccole per creare set di dati più piccoli (ad esempio, per ID utente). Le tabelle dei modelli presentano una serie di limitazioni descritte di seguito. Al contrario, le tabelle partizionate e le tabelle raggruppate in cluster sono i modi consigliati per ottenere questo comportamento.

Per utilizzare una tabella modello tramite l'API BigQuery, aggiungi un parametro templateSuffix alla richiesta insertAll. Per lo strumento a riga di comando bq, aggiungi il flag template_suffix al comando insert. Se BigQuery rileva un parametro templateSuffix o il flag template_suffix, considera la tabella di destinazione come un modello di base. Crea una nuova tabella che condivide lo stesso schema della tabella di destinazione e ha un nome che include il suffisso specificato:

<targeted_table_name> + <templateSuffix>

Utilizzando una tabella modello, eviti il sovraccarico di creare ogni tabella singolarmente e specificare lo schema per ciascuna. Devi solo creare un singolo modello e fornire suffissi diversi in modo che BigQuery possa creare le nuove tabelle per te. BigQuery inserisce le tabelle nello stesso progetto e set di dati.

Le tabelle create utilizzando le tabelle modello sono in genere disponibili in pochi secondi. In rari casi, la loro disponibilità potrebbe richiedere più tempo.

Modificare lo schema della tabella del modello

Se modifichi uno schema di tabella modello, tutte le tabelle generate successivamente utilizzano lo schema aggiornato. Le tabelle generate in precedenza non sono interessate, a meno che la tabella esistente non abbia ancora un buffer di streaming.

Per le tabelle esistenti che hanno ancora un buffer di streaming, se modifichi lo schema della tabella modello in modo compatibile con le versioni precedenti, viene aggiornato anche lo schema delle tabelle generate in streaming attivo. Tuttavia, se modifichi lo schema della tabella modello in modo non compatibile con le versioni precedenti, tutti i dati memorizzati nel buffer che utilizzano il vecchio schema vengono persi. Inoltre, non puoi trasmettere in streaming nuovi dati alle tabelle generate esistenti che utilizzano lo schema precedente, ora incompatibile.

Dopo aver modificato uno schema di tabella del modello, attendi che le modifiche siano state propagate prima di provare a inserire nuovi dati o a eseguire query sulle tabelle generate. Le richieste di inserimento di nuovi campi dovrebbero essere completate in pochi minuti. I tentativi di query sui nuovi campi potrebbero richiedere un'attesa più lunga, fino a 90 minuti.

Se vuoi modificare lo schema di una tabella generata, non farlo finché lo streaming tramite la tabella modello non è terminato e la sezione delle statistiche di streaming della tabella generata non è presente nella risposta tables.get(), il che indica che non sono presenti dati memorizzati nella tabella.

Le tabelle partizionate e le tabelle in cluster non presentano le limitazioni precedenti e sono il meccanismo consigliato.

Dettagli della tabella del modello

Valore del suffisso del modello
Il valore di templateSuffix (o --template_suffix) deve contenere solo lettere (a-z, A-Z), numeri (0-9) o trattini bassi (_). La lunghezza massima combinata del nome della tabella e del suffisso della tabella è di 1024 caratteri.
Quota

Le tabelle dei modelli sono soggette a limitazioni della quota di streaming. Il tuo progetto può creare fino a 10 tabelle al secondo con tabelle modello, in modo simile all'API tables.insert. Questa quota si applica solo alle tabelle in fase di creazione, non a quelle in fase di modifica.

Se la tua applicazione deve creare più di 10 tabelle al secondo, ti consigliamo di utilizzare le tabelle in cluster. Ad esempio, puoi inserire l'ID della tabella ad alta cardinalità nella colonna chiave di una singola tabella di clustering.

Durata

La tabella generata eredita il tempo di scadenza dal set di dati. Come per i normali dati di streaming, le tabelle generate non possono essere copiate immediatamente.

Deduplicazione

La deduplicazione avviene solo tra riferimenti uniformi a una tabella di destinazione. Ad esempio, se esegui lo streaming simultaneo in una tabella generata utilizzando sia tabelle modello sia un normale comando insertAll, non viene eseguita la deduplicazione tra le righe inserite dalle tabelle modello e un normale comando insertAll.

Visualizzazioni

La tabella modello e le tabelle generate non devono essere visualizzazioni.

Risolvere i problemi relativi agli inserimenti di flussi di dati

Le sezioni seguenti descrivono come risolvere gli errori che si verificano quando inserisci flussi di dati in BigQuery utilizzando l'API Streaming precedente. Per ulteriori informazioni su come risolvere gli errori di quota per gli inserimenti di flussi di dati, consulta la sezione Errori di quota relativi agli inserimenti di flussi di dati.

Codici di risposta HTTP di errore

Se ricevi un codice di risposta HTTP di errore, ad esempio un errore di rete, non è possibile sapere se l'inserimento dello streaming è andato a buon fine. Se provi a inviare di nuovo la richiesta, potresti ritrovarti con righe duplicate nella tabella. Per proteggere la tabella dalla duplicazione, imposta la proprietà insertId quando invii la richiesta. BigQuery utilizza la proprietà insertId per la deduplicazione.

Se ricevi un errore di autorizzazione, un errore di nome tabella non valido o un errore di quota superata, non vengono inserite righe e l'intera richiesta non va a buon fine.

Codici di risposta HTTP riusciti

Anche se ricevi un codice di risposta HTTP riuscito, devi controllare la proprietà insertErrors della risposta per determinare se gli inserimenti di righe sono riusciti, perché è possibile che BigQuery abbia inserito le righe solo parzialmente. Potresti riscontrare uno dei seguenti scenari:

  • Tutte le righe sono state inserite correttamente. Se la proprietà insertErrors è un elenco vuoto, tutte le righe sono state inserite correttamente.
  • Alcune righe sono state inserite correttamente. Ad eccezione dei casi in cui si verifica una mancata corrispondenza dello schema in una delle righe, le righe indicate nella proprietà insertErrors non vengono inserite e tutte le altre righe vengono inserite correttamente. La proprietà errors contiene informazioni dettagliate sul motivo per cui ogni riga non è stata caricata. La proprietà index indica l'indice di riga basato su zero della richiesta a cui si applica l'errore.
  • Nessuna riga inserita correttamente. Se BigQuery rileva una mancata corrispondenza dello schema nelle singole righe della richiesta, nessuna riga viene inserita e viene restituita una voce insertErrors per ogni riga, anche per quelle che non presentavano una mancata corrispondenza dello schema. Le righe che non presentavano una mancata corrispondenza dello schema hanno un errore con la proprietà reason impostata su stopped e possono essere inviate nuovamente così come sono. Le righe non riuscite includono informazioni dettagliate sulla mancata corrispondenza dello schema. Per informazioni sui tipi di protocol buffer supportati per ogni tipo di dati BigQuery, consulta Conversioni dei tipi di dati.

Errori dei metadati per gli inserimenti di flussi di dati

Poiché l'API di streaming di BigQuery è progettata per tassi di inserimento elevati, le modifiche ai metadati della tabella sottostante sono alla fine coerenti quando interagiscono con il sistema di streaming. Nella maggior parte dei casi, le modifiche ai metadati vengono propagate in pochi minuti, ma durante questo periodo le risposte API potrebbero riflettere lo stato incoerente della tabella.

Alcuni scenari includono:

  • Modifiche allo schema. La modifica dello schema di una tabella che ha ricevuto di recente inserimenti di streaming può causare risposte con errori di mancata corrispondenza dello schema perché il sistema di streaming potrebbe non rilevare immediatamente la modifica dello schema.
  • Creazione/eliminazione di tabelle. Lo streaming a una tabella inesistente restituisce una variante di una risposta notFound. Una tabella creata in una risposta potrebbe non essere riconosciuta immediatamente dai successivi inserimenti di streaming. Analogamente, l'eliminazione o la ricreazione di una tabella può creare un periodo di tempo in cui gli inserimenti streaming vengono effettivamente inviati alla vecchia tabella. Gli inserimenti di streaming potrebbero non essere presenti nella nuova tabella.
  • Troncamento della tabella. Il troncamento dei dati di una tabella (utilizzando un job di query che utilizza writeDisposition di WRITE_TRUNCATE) può causare in modo simile l'eliminazione degli inserimenti successivi durante il periodo di coerenza.

Dati mancanti/non disponibili

Gli inserimenti in streaming risiedono temporaneamente nell'archiviazione ottimizzata per la scrittura, che ha caratteristiche di disponibilità diverse rispetto all'archiviazione gestita. Alcune operazioni in BigQuery non interagiscono con l'archiviazione ottimizzata per la scrittura, ad esempio i job di copia delle tabelle e i metodi API come tabledata.list. I dati di streaming recenti non saranno presenti nella tabella di destinazione o nell'output.