Von Cloud Storage nach Dataflow lesen

Verwenden Sie einen der E/A-Connectors TextIO oder AvroIO von Apache Beam, um Daten aus Cloud Storage in Dataflow zu lesen.

Google Cloud -Bibliotheksabhängigkeit einfügen

Schließen Sie die folgende Abhängigkeit ein, um den TextIO- oder AvroIO-Connector mit Cloud Storage zu verwenden. Diese Bibliothek bietet einen Schema-Handler für "gs://"-Dateinamen.

Java

<dependency>
  <groupId>org.apache.beam</groupId>
  <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
  <version>${beam.version}</version>
</dependency>

Python

apache-beam[gcp]==VERSION

Go

import _ "github.com/apache/beam/sdks/v2/go/pkg/beam/io/filesystem/gcs"

Weitere Informationen finden Sie unter Apache Beam SDK installieren.

gRPC für Apache Beam-E/A-Connector in Dataflow aktivieren

Sie können über den Apache Beam-E/A-Connector in Dataflow mit gRPC eine Verbindung zu Cloud Storage herstellen. gRPC ist ein von Google entwickeltes leistungsstarkes Open-Source-Framework für Remote Procedure Calls (RPC), mit dem Sie mit Cloud Storage interagieren können.

Um die Leseanfragen Ihres Dataflow-Jobs an Cloud Storage zu beschleunigen, können Sie den Apache Beam-E/A-Connector in Dataflow für die Verwendung von gRPC aktivieren.

Befehlszeile

  1. Achten Sie darauf, dass Sie das Apache Beam SDK in der Version 2.55.0 oder höher verwenden.
  2. Verwenden Sie die Pipelineoption --additional-experiments=use_grpc_for_gcs, um einen Dataflow-Job auszuführen. Informationen zu den verschiedenen Pipelineoptionen finden Sie unter Optionale Flags.

Apache Beam SDK

  1. Achten Sie darauf, dass Sie das Apache Beam SDK in der Version 2.55.0 oder höher verwenden.
  2. Verwenden Sie die Pipelineoption --experiments=use_grpc_for_gcs, um einen Dataflow-Job auszuführen. Informationen zu den verschiedenen Pipelineoptionen finden Sie unter Einfache Optionen.

Sie können den Apache Beam-E/A-Connector in Dataflow so konfigurieren, dass gRPC-bezogene Messwerte in Cloud Monitoring generiert werden. Die gRPC-bezogenen Messwerte können Ihnen bei Folgendem helfen:

  • Leistung von gRPC-Anfragen an Cloud Storage überwachen und optimieren
  • Probleme beheben und debuggen
  • Einblick in die Nutzung und das Verhalten Ihrer Anwendung erhalten

Informationen zum Konfigurieren des Apache Beam-E/A-Connectors in Dataflow zum Generieren von gRPC-bezogenen Messwerten finden Sie unter Clientseitige Messwerte verwenden. Wenn das Erfassen von Messwerten für Ihren Anwendungsfall nicht erforderlich ist, können Sie die Erfassung von Messwerten deaktivieren. Eine Anleitung finden Sie unter Clientseitige Messwerte deaktivieren.

Parallelität

Die Connectors TextIO und AvroIO unterstützen zwei Parallelitätsebenen:

  • Einzelne Dateien werden separat indexiert, sodass sie von mehreren Workern gelesen werden können.
  • Wenn die Dateien nicht komprimiert sind, kann der Connector Unterbereiche jeder Datei separat lesen, was zu einem sehr hohen Parallelitätsgrad führt. Diese Aufteilung ist nur möglich, wenn jede Zeile in der Datei einen aussagekräftigen Datensatz darstellt. Sie ist beispielsweise für JSON-Dateien nicht standardmäßig verfügbar.

Leistung

Die folgende Tabelle enthält Leistungsmesswerte für das Lesen aus Cloud Storage. Die Arbeitslasten wurden auf einem e2-standard2-Worker mit dem Apache Beam SDK 2.49.0 für Java ausgeführt. Runner v2 wurde nicht verwendet.

100 Mio. Datensätze | 1 KB | 1 Spalte Durchsatz (Byte) Durchsatz (Elemente)
Lesen 320 Mbit/s 320.000 Elemente pro Sekunde

Diese Messwerte basieren auf einfachen Batch-Pipelines. Sie dienen zum Vergleich der Leistung zwischen E/A-Anschlüssen und sind nicht unbedingt repräsentativ für reale Pipelines. Die Leistung der Dataflow-Pipeline ist komplex und eine Funktion des VM-Typs, der verarbeiteten Daten, der Leistung externer Quellen und Senken sowie des Nutzercodes. Die Messwerte basieren auf der Ausführung des Java SDK und sind nicht repräsentativ für die Leistungsmerkmale anderer Sprach-SDKs. Weitere Informationen finden Sie unter Beam E/A-Leistung.

Best Practices

  • Vermeiden Sie die Verwendung von watchForNewFiles mit Cloud Storage. Dieser Ansatz ist für große Produktionspipelines schlecht skalierbar, da der Connector eine Liste der gesehenen Dateien im Arbeitsspeicher vorhalten muss. Die Liste kann nicht aus dem Arbeitsspeicher entfernt werden, was den Arbeitsspeicher der Worker im Laufe der Zeit verringert. Verwenden Sie stattdessen Pub/Sub-Benachrichtigungen für Cloud Storage. Weitere Informationen finden Sie unter Muster für die Dateiverarbeitung.

  • Wenn sowohl der Dateiname als auch der Dateiinhalt nützliche Daten sind, verwenden Sie die Klasse FileIO, um Dateinamen zu lesen. Ein Dateiname kann beispielsweise Metadaten enthalten, die bei der Verarbeitung der Daten in der Datei nützlich sind. Weitere Informationen finden Sie unter Auf Dateinamen zugreifen. Die Dokumentation zu FileIO enthält ebenfalls ein Beispiel für dieses Muster.

Beispiel

Im folgenden Beispiel wird gezeigt, wie Daten aus Cloud Storage gelesen werden.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class ReadFromStorage {
  public static Pipeline createPipeline(Options options) {
    var pipeline = Pipeline.create(options);
    pipeline
        // Read from a text file.
        .apply(TextIO.read().from(
            "gs://" + options.getBucket() + "/*.txt"))
        .apply(
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (x -> {
                      System.out.println(x);
                      return x;
                    })));
    return pipeline;
  }
}

Nächste Schritte