Lettura da BigQuery a Dataflow

Questo documento descrive come leggere i dati da BigQuery in Dataflow.

Panoramica

Per la maggior parte dei casi d'uso, ti consigliamo di utilizzare Managed I/O per leggere da BigQuery. Managed I/O fornisce funzionalità come upgrade automatici e un'API di configurazione coerente. Quando legge da BigQuery, Managed I/O esegue letture dirette delle tabelle che offrono le migliori prestazioni di lettura.

Se hai bisogno di un'ottimizzazione delle prestazioni più avanzata, valuta l'utilizzo del connettore BigQueryIO. Il connettore BigQueryIO supporta sia le letture dirette delle tabelle sia la lettura dai job di esportazione BigQuery. Offre inoltre un controllo più granulare sulla deserializzazione dei record della tabella. Per saperne di più, consulta Utilizzare il connettore BigQueryIO in questo documento.

Proiezione e filtro delle colonne

Per ridurre il volume di dati letti dalla pipeline da BigQuery, puoi utilizzare le seguenti tecniche:

  • La proiezione delle colonne specifica un sottoinsieme di colonne da leggere dalla tabella. Utilizza la proiezione delle colonne quando la tabella ha un numero elevato di colonne e devi leggere solo un sottoinsieme di queste.
  • Filtro righe specifica un predicato da applicare alla tabella. L'operazione di lettura di BigQuery restituisce solo le righe che corrispondono al filtro, il che può ridurre la quantità totale di dati importati dalla pipeline.

L'esempio seguente legge le colonne "user_name" e "age" di una tabella e filtra le righe che non corrispondono al predicato "age > 18". Questo esempio utilizza Managed I/O.

Java

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    String tableSpec = String.format("%s:%s.%s",
        options.getProjectId(),
        options.getDatasetName(),
        options.getTableName());

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", tableSpec)
        .put("row_restriction", "age > 18")
        .put("fields", List.of("user_name", "age"))
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Name: %s, Age: %s%n",
                  row.getString("user_name"),
                  row.getInt64("age"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Leggere da un risultato di query

Il seguente esempio utilizza Managed I/O per leggere il risultato di una query SQL. Esegue una query su un set di dati pubblico BigQuery. Puoi anche utilizzare query SQL per leggere da una vista o da una vista materializzata BigQuery.

Java

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("query", queryString)
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Repo: %s, commits: %d%n",
                  row.getString("repo"),
                  row.getInt64("count"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Utilizzare il connettore BigQueryIO

Il connettore BigQueryIO supporta i seguenti metodi di serializzazione:

Il connettore supporta due opzioni per la lettura dei dati:

  • Esporta job. Per impostazione predefinita, il connettore BigQueryIO esegue un job di esportazione BigQuery che scrive i dati della tabella in Cloud Storage. Il connettore legge quindi i dati da Cloud Storage.
  • Letture dirette delle tabelle. Questa opzione è più veloce dei job di esportazione perché utilizza l'API BigQuery Storage Read e salta il passaggio di esportazione. Per utilizzare le letture dirette delle tabelle, chiama withMethod(Method.DIRECT_READ) quando crei la pipeline.

Quando scegli l'opzione da utilizzare, considera i seguenti punti:

  • In generale, consigliamo di utilizzare le letture dirette delle tabelle. L'API Storage Read è più adatta alle pipeline di dati rispetto ai job di esportazione, perché non richiede il passaggio intermedio dell'esportazione dei dati.

  • Se utilizzi le letture dirette, ti viene addebitato l'utilizzo dell'API Storage Read. Consulta i prezzi dell'estrazione dei dati nella pagina Prezzi di BigQuery.

  • Non sono previsti costi aggiuntivi per i job di esportazione. Tuttavia, i job di esportazione hanno dei limiti. Per lo spostamento di grandi quantità di dati, in cui la tempestività è una priorità e il costo è regolabile, sono consigliate le letture dirette.

  • L'API Storage di lettura ha limiti di quota. Utilizza le metricheGoogle Cloud per monitorare l'utilizzo della quota.

  • Se utilizzi i job di esportazione, imposta l'--tempLocation opzione della pipeline per specificare un bucket Cloud Storage per i file esportati.

  • Quando utilizzi l'API Storage di lettura, potresti visualizzare errori di scadenza del lease e timeout della sessione nei log, ad esempio:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    Questi errori possono verificarsi quando un'operazione richiede più tempo del timeout, in genere nelle pipeline che vengono eseguite per più di 6 ore. Per risolvere il problema, passa alle esportazioni di file.

  • Il grado di parallelismo dipende dal metodo di lettura:

    • Letture dirette: il connettore I/O produce un numero dinamico di stream in base alle dimensioni della richiesta di esportazione. Legge questi stream direttamente da BigQuery in parallelo.

    • Job di esportazione: BigQuery determina il numero di file da scrivere in Cloud Storage. Il numero di file dipende dalla query e dal volume di dati. Il connettore I/O legge i file esportati in parallelo.

La tabella seguente mostra le metriche di rendimento per varie opzioni di lettura BigQuery I/O. I carichi di lavoro sono stati eseguiti su un worker e2-standard2, utilizzando l'SDK Apache Beam 2.49.0 per Java. Non ha utilizzato Runner v2.

100 milioni di record | 1 kB | 1 colonna Velocità effettiva (byte) Throughput (elementi)
Storage Read 120 MBps 88.000 elementi al secondo
Esportazione Avro 105 MB/s 78.000 elementi al secondo
Esportazione JSON 110 MBps 81.000 elementi al secondo

Queste metriche si basano su semplici pipeline batch. Sono pensati per confrontare le prestazioni tra i connettori I/O e non sono necessariamente rappresentativi delle pipeline reali. Il rendimento della pipeline Dataflow è complesso e dipende dal tipo di VM, dai dati in fase di elaborazione, dal rendimento delle origini e dei sink esterni e dal codice utente. Le metriche si basano sull'esecuzione dell'SDK Java e non sono rappresentative delle caratteristiche di prestazioni di altri SDK di linguaggio. Per saperne di più, consulta Prestazioni di Beam IO.

Esempi

I seguenti esempi di codice utilizzano il connettore BigQueryIO con letture dirette delle tabelle. Per utilizzare un job di esportazione, ometti la chiamata a withMethod.

Leggere i record formattati in Avro

Questo esempio mostra come utilizzare il connettore BigQueryIO per leggere i record in formato Avro.

Per leggere i dati BigQuery in record formattati in Avro, utilizza il metodo read(SerializableFunction). Questo metodo accetta una funzione definita dall'applicazione che analizza gli oggetti SchemaAndRecord e restituisce un tipo di dati personalizzato. L'output del connettore è un PCollection del tuo tipo di dati personalizzato.

Il seguente codice legge un PCollection<MyData> da una tabella BigQuery, dove MyData è una classe definita dall'applicazione.

Java

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Il metodo read accetta un'interfaccia SerializableFunction<SchemaAndRecord, T>, che definisce una funzione per la conversione dai record Avro a una classe di dati personalizzata. Nell'esempio di codice precedente, il metodo MyData.apply implementa questa funzione di conversione. La funzione di esempio analizza i campi name e age dal record Avro e restituisce un'istanza MyData.

Per specificare la tabella BigQuery da leggere, chiama il metodo from, come mostrato nell'esempio precedente. Per ulteriori informazioni, consulta la sezione Nomi delle tabelle nella documentazione del connettore BigQuery I/O.

Leggi TableRow oggetti

Questo esempio mostra come utilizzare il connettore BigQueryIO per leggere gli oggetti TableRow.

Il metodo readTableRows legge i dati BigQuery in un PCollection di oggetti TableRow. Ogni TableRow è una mappa di coppie chiave-valore che contiene una singola riga di dati della tabella. Specifica la tabella BigQuery da leggere chiamando il metodo from.

Il seguente codice legge un PCollection<TableRows> da una tabella BigQuery.

Java

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Questo esempio mostra anche come accedere ai valori del dizionario TableRow. I valori interi vengono codificati come stringhe in modo che corrispondano al formato JSON esportato di BigQuery.

Passaggi successivi