Von BigQuery in Dataflow lesen

In diesem Dokument wird beschrieben, wie Sie Daten aus BigQuery in Dataflow einlesen.

Übersicht

In den meisten Anwendungsfällen sollten Sie Managed I/O verwenden, um Daten aus BigQuery zu lesen. Managed I/O bietet Funktionen wie automatische Upgrades und eine einheitliche Konfigurations-API. Beim Lesen aus BigQuery führt Managed I/O direkte Tabellenlesevorgänge aus, die die beste Leseleistung bieten.

Wenn Sie eine erweiterte Leistungsoptimierung benötigen, sollten Sie den BigQueryIO-Connector verwenden. Der BigQueryIO-Connector unterstützt sowohl das direkte Lesen von Tabellen als auch das Lesen aus BigQuery-Exportjobs. Außerdem bietet sie eine detailliertere Steuerung der Deserialisierung von Tabelleneinträgen. Weitere Informationen finden Sie in diesem Dokument unter BigQueryIO-Connector verwenden.

Spaltenprojektion und ‑filterung

Um das Datenvolumen zu reduzieren, das von Ihrer Pipeline aus BigQuery gelesen wird, können Sie die folgenden Methoden verwenden:

  • Mit der Spaltenprojektion wird eine Teilmenge von Spalten angegeben, die aus der Tabelle gelesen werden sollen. Verwenden Sie die Spaltenprojektion, wenn Ihre Tabelle eine große Anzahl von Spalten enthält und Sie nur eine Teilmenge davon lesen müssen.
  • Mit Zeilenfilterung wird ein Prädikat angegeben, das auf die Tabelle angewendet werden soll. Der BigQuery-Lesevorgang gibt nur Zeilen zurück, die dem Filter entsprechen. Dadurch kann die Gesamtmenge der von der Pipeline aufgenommenen Daten reduziert werden.

Im folgenden Beispiel werden die Spalten "user_name" und "age" aus einer Tabelle gelesen und Zeilen herausgefiltert, die nicht mit dem Prädikat "age > 18" übereinstimmen. In diesem Beispiel wird Managed I/O verwendet.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.common.collect.ImmutableMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadWithProjectionAndFiltering {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    String tableSpec = String.format("%s:%s.%s",
        options.getProjectId(),
        options.getDatasetName(),
        options.getTableName());

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("table", tableSpec)
        .put("row_restriction", "age > 18")
        .put("fields", List.of("user_name", "age"))
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Name: %s, Age: %s%n",
                  row.getString("user_name"),
                  row.getInt64("age"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Aus einem Abfrageergebnis lesen

Im folgenden Beispiel wird Managed I/O verwendet, um das Ergebnis einer SQL-Abfrage zu lesen. Dabei wird eine Abfrage für ein öffentliches BigQuery-Dataset ausgeführt. Sie können auch SQL-Abfragen verwenden, um Daten aus einer BigQuery-Ansicht oder einer materialisierten Ansicht zu lesen.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.common.collect.ImmutableMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptors;

public class BigQueryReadFromQuery {
  public static void main(String[] args) {
    // The SQL query to run inside BigQuery.
    final String queryString =
        "SELECT repo_name as repo, COUNT(*) as count "
            + "FROM `bigquery-public-data.github_repos.sample_commits` "
            + "GROUP BY repo_name";

    // Parse the pipeline options passed into the application.
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation().create();

    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("query", queryString)
        .build();

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply(Managed.read(Managed.BIGQUERY).withConfig(config)).getSinglePCollection()
        .apply(MapElements
            .into(TypeDescriptors.strings())
            // Access individual fields in the row.
            .via((Row row) -> {
              String output = String.format("Repo: %s, commits: %d%n",
                  row.getString("repo"),
                  row.getInt64("count"));
              System.out.println(output);
              return output;
            }));
    pipeline.run().waitUntilFinish();
  }
}

BigQueryIO-Connector verwenden

Der BigQueryIO-Connector unterstützt die folgenden Serialisierungsmethoden:

  • Daten als Avro-formatierte Datensätze lesen Bei dieser Methode stellen Sie eine Funktion bereit, die die Avro-Datensätze in einen benutzerdefinierten Datentyp parst.
  • Daten als TableRow-Objekte lesen Diese Methode ist praktisch, da kein benutzerdefinierter Datentyp erforderlich ist. Die Leistung ist jedoch in der Regel geringer als beim Lesen von Datensätzen im Avro-Format.

Der Connector unterstützt zwei Optionen zum Lesen von Daten:

  • Exportjob: Standardmäßig wird mit dem BigQueryIO-Connector ein BigQuery-Exportjob ausgeführt, mit dem die Tabellendaten in Cloud Storage geschrieben werden. Der Connector liest dann die Daten aus Cloud Storage.
  • Direktes Lesen von Tabellen: Diese Option ist schneller als Exportjobs, da sie die BigQuery Storage Read API verwendet und der Exportschritt übersprungen wird. Wenn Sie direkte Tabellenzugriffe verwenden möchten, rufen Sie withMethod(Method.DIRECT_READ) auf, wenn Sie die Pipeline erstellen.

Berücksichtigen Sie bei der Auswahl der Option Folgendes:

  • Im Allgemeinen empfehlen wir, Tabellen direkt zu lesen. Die Storage Read API eignet sich besser für Datenpipelines als Exportjobs, da kein Zwischenschritt zum Exportieren von Daten erforderlich ist.

  • Wenn Sie direkte Lesevorgänge verwenden, wird Ihnen die Nutzung der Storage Read API in Rechnung gestellt. Weitere Informationen finden Sie auf der Seite BigQuery-Preise unter Preise für die Datenextraktion.

  • Für Exportjobs fallen keine zusätzlichen Kosten an. Für Exportjobs gelten jedoch Limits. Bei großen Datenmengen, bei denen Aktualität Priorität hat und die Kosten angepasst werden können, werden direkte Lesevorgänge empfohlen.

  • Für die Storage Read API gelten Kontingentlimits. Verwenden Sie Google Cloud -Messwerte, um die Kontingentnutzung zu überwachen.

  • Wenn Sie Exportjobs verwenden, legen Sie die --tempLocation-Pipelineoption fest, um einen Cloud Storage-Bucket für die exportierten Dateien anzugeben.

  • Bei Verwendung der Storage Read API werden in den Logs möglicherweise Fehler zu Ablauf des Leasingvertrags und Zeitüberschreitung der Sitzung angezeigt, z. B.:

    • DEADLINE_EXCEEDED
    • Server Unresponsive
    • StatusCode.FAILED_PRECONDITION details = "there was an error operating on 'projects/<projectID>/locations/<location>/sessions/<sessionID>/streams/<streamID>': session`

    Diese Fehler können auftreten, wenn ein Vorgang länger als das Zeitlimit dauert, in der Regel bei Pipelines, die länger als 6 Stunden ausgeführt werden. Um dieses Problem zu beheben, sollten Sie auf den Export von Dateien umstellen.

  • Der Grad der Parallelität hängt von der Lesemethode ab:

    • Direktes Lesen: Der E/A-Connector erzeugt eine dynamische Anzahl von Streams, die von der Größe der Exportanfrage abhängt. Die Streams werden parallel direkt aus BigQuery gelesen.

    • Exportjobs: BigQuery bestimmt, in wie viele Dateien in Cloud Storage geschrieben wird. Die Anzahl der Dateien hängt von der Abfrage und dem Datenvolumen ab. Der E/A-Connector liest die exportierten Dateien parallel.

Die folgende Tabelle enthält Leistungsmesswerte für verschiedene BigQuery-E/A-Leseoptionen. Die Arbeitslasten wurden auf einem e2-standard2-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Runner v2 wurde nicht verwendet.

100 Mio. Datensätze | 1 KB | 1 Spalte Durchsatz (Byte) Durchsatz (Elemente)
Speicherlesevorgänge 120 Mbit/s 88.000 Elemente pro Sekunde
Avro-Export 105 Mbit/s 78.000 Elemente pro Sekunde
JSON-Export 110 Mbit/s 81.000 Elemente pro Sekunde

Diese Messwerte basieren auf einfachen Batch-Pipelines. Sie dienen zum Vergleich der Leistung zwischen E/A-Anschlüssen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Die Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Beam E/A-Leistung.

Beispiele

In den folgenden Codebeispielen wird der BigQueryIO-Connector mit direkten Tabellenzugriffen verwendet. Wenn Sie stattdessen einen Exportjob verwenden möchten, lassen Sie den Aufruf von withMethod weg.

Avro-formatierte Datensätze lesen

In diesem Beispiel wird gezeigt, wie Sie den BigQueryIO-Connector verwenden, um Datensätze im Avro-Format zu lesen.

Verwenden Sie die Methode read(SerializableFunction), um BigQuery-Daten in Avro-formatierte Datensätze einzulesen. Diese Methode verwendet eine anwendungsdefinierte Funktion, die SchemaAndRecord-Objekte parst und einen benutzerdefinierten Datentyp zurückgibt. Die Ausgabe des Connectors ist ein PCollection Ihres benutzerdefinierten Datentyps.

Der folgende Code liest ein PCollection<MyData> aus einer BigQuery-Tabelle, wobei MyData eine anwendungsdefinierte Klasse ist.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BigQueryReadAvro {

  // A custom datatype to hold a record from the source table.
  @DefaultCoder(AvroCoder.class)
  public static class MyData {
    public String name;
    public Long age;

    // Function to convert Avro records to MyData instances.
    public static class FromSchemaAndRecord
            implements SerializableFunction<SchemaAndRecord, MyData> {
      @Override public MyData apply(SchemaAndRecord elem) {
        MyData data = new MyData();
        GenericRecord record = elem.getRecord();
        data.name = ((Utf8) record.get("user_name")).toString();
        data.age = (Long) record.get("age");
        return data;
      }
    }
  }

  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into Avro records, using an application-defined parsing function.
        .apply(BigQueryIO.read(new MyData.FromSchemaAndRecord())
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(TypedRead.Method.DIRECT_READ))
        // The output from the previous step is a PCollection<MyData>.
        .apply(MapElements
            .into(TypeDescriptor.of(MyData.class))
            .via((MyData x) -> {
              System.out.printf("Name: %s, Age: %d%n", x.name, x.age);
              return x;
            }));
    pipeline.run().waitUntilFinish();
  }
}

Die read-Methode verwendet eine SerializableFunction<SchemaAndRecord, T>-Schnittstelle, die eine Funktion zum Konvertieren von Avro-Datensätzen in eine benutzerdefinierte Datenklasse definiert. Im vorherigen Codebeispiel wird diese Konvertierungsfunktion von der Methode MyData.apply implementiert. Die Beispielfunktion parst die Felder name und age aus dem Avro-Datensatz und gibt eine MyData-Instanz zurück.

Rufen Sie die Methode from auf, um anzugeben, welche BigQuery-Tabelle gelesen werden soll, wie im vorherigen Beispiel gezeigt. Weitere Informationen finden Sie in der Dokumentation zum BigQuery-E/A-Connector unter Tabellennamen.

Lesen von TableRow-Objekten

In diesem Beispiel wird gezeigt, wie Sie mit dem BigQueryIO-Connector TableRow-Objekte lesen.

Mit der Methode readTableRows werden BigQuery-Daten in ein PCollection von TableRow-Objekten eingelesen. Jedes TableRow ist eine Map mit Schlüssel/Wert-Paaren, die eine einzelne Zeile mit Tabellendaten enthält. Geben Sie die BigQuery-Tabelle an, aus der gelesen werden soll, indem Sie die Methode from aufrufen.

Mit dem folgenden Code wird PCollection<TableRows> aus einer BigQuery-Tabelle gelesen.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import com.google.api.services.bigquery.model.TableRow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class BiqQueryReadTableRows {
  public static void main(String[] args) {
    // Parse the pipeline options passed into the application. Example:
    //   --projectId=$PROJECT_ID --datasetName=$DATASET_NAME --tableName=$TABLE_NAME
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    PipelineOptionsFactory.register(ExamplePipelineOptions.class);
    ExamplePipelineOptions options = PipelineOptionsFactory.fromArgs(args)
        .withValidation()
        .as(ExamplePipelineOptions.class);

    // Create a pipeline and apply transforms.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // Read table data into TableRow objects.
        .apply(BigQueryIO.readTableRows()
            .from(String.format("%s:%s.%s",
                options.getProjectId(),
                options.getDatasetName(),
                options.getTableName()))
            .withMethod(Method.DIRECT_READ)
        )
        // The output from the previous step is a PCollection<TableRow>.
        .apply(MapElements
            .into(TypeDescriptor.of(TableRow.class))
            // Use TableRow to access individual fields in the row.
            .via((TableRow row) -> {
              var name = (String) row.get("user_name");
              var age = (String) row.get("age");
              System.out.printf("Name: %s, Age: %s%n", name, age);
              return row;
            }));
    pipeline.run().waitUntilFinish();
  }
}

In diesem Beispiel wird auch gezeigt, wie auf die Werte aus dem TableRow-Dictionary zugegriffen wird. Ganzzahlige Werte werden als Strings codiert, um dem exportierten JSON-Format von BigQuery zu entsprechen.

Nächste Schritte